From: <i7...@us...> - 2009-04-29 14:45:14
|
Revision: 5742 http://octave.svn.sourceforge.net/octave/?rev=5742&view=rev Author: i7tiol Date: 2009-04-29 14:45:11 +0000 (Wed, 29 Apr 2009) Log Message: ----------- Minor fixes. Removed dependency on package general to retain usability with Octave-3.0.. Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/INDEX trunk/octave-forge/main/parallel/doc/README.bw trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m trunk/octave-forge/main/parallel/inst/bw_list.m trunk/octave-forge/main/parallel/src/Makefile Added Paths: ----------- trunk/octave-forge/main/parallel/src/__internal_exit__.cc Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2009-04-29 12:03:22 UTC (rev 5741) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2009-04-29 14:45:11 UTC (rev 5742) @@ -8,7 +8,6 @@ parallel execution on a single machine see e.g. function parcellfun (author: Jaroslav Hajek) in package general. Depends: octave (>= 3.0.0) -Depends: general (>= 1.1.0) Autoload: yes License: GPL, see individual files for GPL version Url: http://octave.sf.net Modified: trunk/octave-forge/main/parallel/INDEX =================================================================== --- trunk/octave-forge/main/parallel/INDEX 2009-04-29 12:03:22 UTC (rev 5741) +++ trunk/octave-forge/main/parallel/INDEX 2009-04-29 14:45:11 UTC (rev 5742) @@ -15,6 +15,7 @@ __bw_prcv__ __bw_scheduler__ __bw_computing_machine__ + __internal_exit__ General select User Modified: trunk/octave-forge/main/parallel/doc/README.bw =================================================================== --- trunk/octave-forge/main/parallel/doc/README.bw 2009-04-29 12:03:22 UTC (rev 5741) +++ trunk/octave-forge/main/parallel/doc/README.bw 2009-04-29 14:45:11 UTC (rev 5742) @@ -36,12 +36,12 @@ a cell-array, see below). "results" of course may be a structure or cell array too to accomodate more than one value. -For each set of arguments, the function is run at a different one of -the currently available machines. The user supplies a one-dimensional -cell-array with one set of arguments (i.e. the value of "args") in -each entry. The cell-array must be stored in a file under the data -directory (see below) and remain there until computation is finished -(for the case the scheduler needs restarting). +For each set of arguments, execution of the function will be scheduled +to one of the currently available machines. The user supplies a +one-dimensional cell-array with one set of arguments (i.e. the value +of "args") in each entry. The cell-array must be stored in a file +under the data directory (see below) and remain there until +computation is finished (for the case the scheduler needs restarting). The current state is kept in a variable "state" saved to a file whose name is sprintf("%s-%s.state", functionname, argumentsfilename) within Modified: trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m =================================================================== --- trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m 2009-04-29 12:03:22 UTC (rev 5741) +++ trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m 2009-04-29 14:45:11 UTC (rev 5742) @@ -45,19 +45,28 @@ source (userrc); endif - ## some preparation of configuration - ssh_opt_str = sprintf ("-o ConnectTimeout=%i", \ - max (round (connect_timeout), 0)); + ## filenames + stfn = fullfile (state_dir, sprintf ("%s-%s.state", f, argfile)); + pidfn = fullfile (state_dir, sprintf ("%s-%s.pid", f, argfile)); + lfn = fullfile (state_dir, sprintf ("%s-%s.lock", f, argfile)); ## read arguments - args = __bw_load_variable__ (fullfile (data_dir, argfile)); + try + args = __bw_load_variable__ (fullfile (data_dir, argfile)); + catch + state.scheduler_msg = lasterr; + __bw_secure_save__ (stfn, state, "state"); + return; + end_try_catch + if (! iscell (args) || (rows (args) > 1 && columns (args) > 1)) + state.scheduler_msg = "arguments no one-dimensional array\n"; + __bw_secure_save__ (stfn, state, "state"); + return; + endif n_args = length (args); ## racing condition, but might avoid some blocked schedulers - if (__bw_is_locked__ (lfn = \ - fullfile \ - (state_dir, \ - sprintf ("%s-%s.lock", f, argfile)))) + if (__bw_is_locked__ (lfn)) return; endif @@ -65,7 +74,6 @@ lfd = __bw_lock_file__ (lfn); ## note pid in an extra file - pidfn = fullfile (state_dir, sprintf ("%s-%s.pid", f, argfile)); if ((pidfd = fopen (pidfn, "w")) < 0) return; endif @@ -73,9 +81,7 @@ fclose (pidfd); ## initialize state - if (isempty (stat (stfn = \ - fullfile (state_dir, \ - sprintf ("%s-%s.state", f, argfile))))) + if (isempty (stat (stfn))) state.results = state.msg = cell (n_args, 1); state.active = state.ready = state.error = zeros (n_args, 1); state.scheduler_msg = ""; @@ -99,11 +105,13 @@ ## initialize machine information m_n = length (computing_machines); - m_njobs = m_unresponsive = m_active = pids = pipesr = \ + m_njobs = m_irresponsive = m_active = pids = pipesr = \ pipesw = m_just_tried = zeros (m_n, 1); ## fork one permanent subprocess per machine cmd = "ssh"; + ssh_timeout_opt_str = sprintf ("-o ConnectTimeout=%i", \ + max (round (connect_timeout), 0)); for id = 1:m_n [pdrp, pdwc, err, msg] = pipe (); if (err) @@ -125,7 +133,8 @@ pclose (pdwp); fcntl (pdrc, F_SETFL, O_SYNC); while (true) # no break, process killed from parent process - cmd_args = {ssh_opt_str, \ + cmd_args = {"-o PasswordAuthentication=no", \ + ssh_timeout_opt_str, \ computing_machines{id}, \ "octave", \ "-q", \ @@ -188,7 +197,7 @@ ## not reached, but left here lest they are needed sometime pclose (pdrc); pclose (pdwc); - __exit__ (0); + __internal_exit__ (0); elseif (pids(id) > 0) # parent process pclose (pdwc); pclose (pdrc); @@ -209,10 +218,10 @@ while (! all (state.ready)) args_unused = find (! (state.ready | state.active)); m_free = cat (1, \ - find (! (m_active | m_unresponsive)), \ + find (! (m_active | m_irresponsive)), \ find ((! (m_active | m_just_tried)) & \ - m_unresponsive), \ - find ((! m_active) & m_just_tried & m_unresponsive)); + m_irresponsive), \ + find ((! m_active) & m_just_tried & m_irresponsive)); ## there should always be free childs here, give them a task for id = 1:min (length (m_free), length (args_unused)) ## tell child to use this argument @@ -238,7 +247,7 @@ end_try_catch switch res case 0 # success - m_unresponsive(id) = false; + m_irresponsive(id) = false; m_just_tried(id) = false; m_njobs(id)++; state.active(m_active(id)) = false; @@ -255,11 +264,11 @@ m_active(id) = 0; case 1 # computing machine (got) unreachable state.active(m_active(id)) = false; - m_unresponsive(id) = true; + m_irresponsive(id) = true; m_just_tried(id) = true; m_active(id) = 0; case 2 # computing function returned error - m_unresponsive(id) = false; + m_irresponsive(id) = false; m_just_tried(id) = false; m_njobs(id)++; state.active(m_active(id)) = false; @@ -284,13 +293,13 @@ endfor ## update statefile, but not if last update was a short time ago state.machines.active = m_active; - state.machines.unresponsive = m_unresponsive; + state.machines.irresponsive = m_irresponsive; state.machines.njobs = m_njobs; if ((tp = time) - last_saved >= min_save_interv) __bw_secure_save__ (stfn, state, "state"); last_saved = tp; endif - if (all ((m_just_tried | ! m_unresponsive)(! m_active))) + if (all ((m_just_tried | ! m_irresponsive)(! m_active))) m_just_tried = zeros (m_n, 1); endif endwhile Modified: trunk/octave-forge/main/parallel/inst/bw_list.m =================================================================== --- trunk/octave-forge/main/parallel/inst/bw_list.m 2009-04-29 12:03:22 UTC (rev 5741) +++ trunk/octave-forge/main/parallel/inst/bw_list.m 2009-04-29 14:45:11 UTC (rev 5742) @@ -57,7 +57,7 @@ nr = length (find (state.ready)); printf ("%i/%i, ", nr, n); if (isfield (state, "machines")) - printf ("%i", length (find (! state.machines.unresponsive))) + printf ("%i", length (find (! state.machines.irresponsive))) else printf ("0"); endif Modified: trunk/octave-forge/main/parallel/src/Makefile =================================================================== --- trunk/octave-forge/main/parallel/src/Makefile 2009-04-29 12:03:22 UTC (rev 5741) +++ trunk/octave-forge/main/parallel/src/Makefile 2009-04-29 14:45:11 UTC (rev 5742) @@ -2,7 +2,7 @@ recv.oct reval.oct send.oct \ __bw_is_locked__.oct __bw_lock_file__.oct \ __bw_unlock_file__.oct \ - __bw_prcv__.oct __bw_psend__.oct \ + __bw_prcv__.oct __bw_psend__.oct __internal_exit__.oct \ select.oct Added: trunk/octave-forge/main/parallel/src/__internal_exit__.cc =================================================================== --- trunk/octave-forge/main/parallel/src/__internal_exit__.cc (rev 0) +++ trunk/octave-forge/main/parallel/src/__internal_exit__.cc 2009-04-29 14:45:11 UTC (rev 5742) @@ -0,0 +1,37 @@ +// Copyright (C) 2008 Olaf Till <ola...@un...> + +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +#include <octave/oct.h> + +#include <unistd.h> +#include <signal.h> + +// This function duplicates __exit__.cc to remove the dependency on +// the package main/general until the current Octave-3.1... will be +// the stable version. + +DEFUN_DLD (__internal_exit__, args, , + "-*- texinfo -*-\n\ +@deftypefn {Loadable Function} __exit__ (status)\n\ +This is a wrapper over the POSIX _exit() system call. Calling this function\n\ +will terminate the running process immediatelly, bypassing normal Octave\n\ +terminating sequence. It is suitable to terminate a forked process. It\n\ +should be considered expert-only and not to be used in normal code.\n\ +@end deftypefn") +{ + _exit (args.length () > 0 ? args(0).int_value () : 0); +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2009-08-19 08:30:30
|
Revision: 6116 http://octave.svn.sourceforge.net/octave/?rev=6116&view=rev Author: i7tiol Date: 2009-08-19 08:30:19 +0000 (Wed, 19 Aug 2009) Log Message: ----------- parallel/src/pserver.cc: fix quitting_gracefully, make extern Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/src/pserver.cc Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2009-08-19 05:01:22 UTC (rev 6115) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2009-08-19 08:30:19 UTC (rev 6116) @@ -1,6 +1,6 @@ Name: Parallel -Version: 2.0.0 -Date: 2009-03-29 +Version: 2.0.1 +Date: 2009-08-19 Author: Hayato Fujiwara and Olaf Till <ola...@un...> Maintainer: Hayato Fujiwara and Olaf Till <ola...@un...> Title: Parallel Computing. Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2009-08-19 05:01:22 UTC (rev 6115) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2009-08-19 08:30:19 UTC (rev 6116) @@ -61,8 +61,6 @@ // SSIZE_MAX might be for 64-bit. Limit to 2^31-1 #define BUFF_SIZE 2147483647 -static bool quitting_gracefully = false; - // Handle server SIGTERM SIGQUIT static RETSIGTYPE @@ -99,6 +97,7 @@ do_octave_atexit_server (void) { static bool deja_vu = false; + extern bool quitting_gracefully; while (! octave_atexit_functions.empty ()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2009-08-20 13:40:20
|
Revision: 6118 http://octave.svn.sourceforge.net/octave/?rev=6118&view=rev Author: i7tiol Date: 2009-08-20 13:40:14 +0000 (Thu, 20 Aug 2009) Log Message: ----------- parallel: some fixes for Octave-3.2.., removed unneeded code in __bw_psend__.cc and __bw_prcv__.cc Modified Paths: -------------- trunk/octave-forge/main/parallel/inst/__bw_computing_machine__.m trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m trunk/octave-forge/main/parallel/src/__bw_prcv__.cc trunk/octave-forge/main/parallel/src/__bw_psend__.cc trunk/octave-forge/main/parallel/src/pserver.cc Modified: trunk/octave-forge/main/parallel/inst/__bw_computing_machine__.m =================================================================== --- trunk/octave-forge/main/parallel/inst/__bw_computing_machine__.m 2009-08-19 20:32:55 UTC (rev 6117) +++ trunk/octave-forge/main/parallel/inst/__bw_computing_machine__.m 2009-08-20 13:40:14 UTC (rev 6118) @@ -46,11 +46,11 @@ end_try_catch if (err) - __bw_psend__ (stdout, 2, true); - __bw_psend__ (stdout, msg, true); + __bw_psend__ (stdout, 2); + __bw_psend__ (stdout, msg); else - __bw_psend__ (stdout, 0, true); - __bw_psend__ (stdout, res, true); + __bw_psend__ (stdout, 0); + __bw_psend__ (stdout, res); endif fflush (stdout); Modified: trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m =================================================================== --- trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m 2009-08-19 20:32:55 UTC (rev 6117) +++ trunk/octave-forge/main/parallel/inst/__bw_scheduler__.m 2009-08-20 13:40:14 UTC (rev 6118) @@ -147,8 +147,8 @@ [pdw, pdr, pid] = popen2 (cmd, cmd_args, 1); while (true) # break if eof on pdr arg_id = __bw_prcv__ (pdrc).psend_var; - __bw_psend__ (pdw, args{arg_id}, true); - __bw_psend__ (pdw, arg_id, true); + __bw_psend__ (pdw, args{arg_id}); + __bw_psend__ (pdw, arg_id); fflush (pdw); try if (ismatrix (tp = __bw_prcv__ (pdr))) @@ -167,8 +167,8 @@ break; end_try_catch tp = tp.psend_var; - __bw_psend__ (pdwc, 2, true); - __bw_psend__ (pdwc, tp, true); + __bw_psend__ (pdwc, 2); + __bw_psend__ (pdwc, tp); fflush (pdwc); else # success try @@ -179,15 +179,15 @@ break; end_try_catch tp = tp.psend_var; - __bw_psend__ (pdwc, 0, true); - __bw_psend__ (pdwc, tp, true); + __bw_psend__ (pdwc, 0); + __bw_psend__ (pdwc, tp); fflush (pdwc); endif endwhile waitpid (pid); pclose (pdr); pclose (pdw); - __bw_psend__ (pdwc, 1, true); # computing machine (got) unreachable + __bw_psend__ (pdwc, 1); # computing machine (got) unreachable fflush (pdwc); if ((rest = connect_timeout + constart - time) > 0) pause (rest); @@ -225,7 +225,7 @@ ## there should always be free childs here, give them a task for id = 1:min (length (m_free), length (args_unused)) ## tell child to use this argument - __bw_psend__ (pipesw(m_free(id)), args_unused(id), true); + __bw_psend__ (pipesw(m_free(id)), args_unused(id)); fflush (pipesw(m_free(id))); ## mark child active (busy) and note argument of machine m_active(m_free(id)) = args_unused(id); Modified: trunk/octave-forge/main/parallel/src/__bw_prcv__.cc =================================================================== --- trunk/octave-forge/main/parallel/src/__bw_prcv__.cc 2009-08-19 20:32:55 UTC (rev 6117) +++ trunk/octave-forge/main/parallel/src/__bw_prcv__.cc 2009-08-20 13:40:14 UTC (rev 6118) @@ -20,7 +20,7 @@ #include <octave/oct-stream.h> #include <octave/oct-map.h> -DEFUN_DLD (__bw_prcv__, args, nargout, "prcv (pd)\n\ +DEFUN_DLD (__bw_prcv__, args, nargout, "__bw_prcv__ (pd)\n\ Reads one variable from pipe stream 'pd'.\n\ The variable must have been coded in Octaves binary format,\n\ including a header. This can be done by 'psend ()'.\n\ @@ -29,8 +29,8 @@ call 'feof ()' afterwards. If EOF is met later in reading,\n\ it causes an error.\n\ Normally, a structure is returned with the variable under its name\n\ -in a single field. With no output arguments, the variable is installed\n\ -into memory.\n\ +in a single field. Originally, with no output arguments, the variable was\n\ +installed into memory, but this has been disabled.\n\ \n\ This function may change and is internal to the parallel package.\n") { @@ -38,10 +38,10 @@ Octave_map retstruct; if (args.length () != 1) { - error ("prcv: exactly one argument required\n"); + error ("__bw_prcv__: exactly one argument required\n"); return retval; } - octave_stream is = octave_stream_list::lookup (args(0), "prcv"); + octave_stream is = octave_stream_list::lookup (args(0), "__bw_prcv__"); if (error_state) return retval; if (is.is_open ()) { @@ -49,7 +49,7 @@ // 114: "r", 43: "+" if (! strchr (mode.c_str (), 114) && ! strchr (mode.c_str (), 43)) { - error ("prcv: stream not readable\n"); + error ("__bw_prcv__: stream not readable\n"); return retval; } #ifdef PATCHED_PIPE_CODE @@ -59,13 +59,13 @@ // 98: "b" if (! strchr (mode.c_str (), 98)) { - error ("prcv: stream not binary\n"); + error ("__bw_prcv__: stream not binary\n"); return retval; } #endif } else { - error ("prcv: stream not open\n"); + error ("__bw_prcv__: stream not open\n"); return retval; } @@ -100,50 +100,17 @@ // after the header exactly one variable is expected. This // is mended by asking for EOF here. if (ps.eof () || error_state || name.empty ()) { - error ("prcv: error in reading variable data\n"); + error ("__bw_prcv__: error in reading variable data\n"); return retval; } if (! tc.is_defined ()) { // What means this? - error ("prcv: error in reading variable\n"); + error ("__bw_prcv__: error in reading variable\n"); return retval; } - if (nargout == 1) { - retstruct.assign(name, tc); - retval = retstruct; - } - else { - // install_loaded_variable () is static ... here the - // code equivalent to - // - // install_loaded_variable (true, name, tc, global, doc); - // - // is duplicated (except one error check) ... + retstruct.assign(name, tc); + retval = retstruct; - symbol_record *lsr = curr_sym_tab->lookup (name); - - bool is_undefined = true; - bool is_variable = false; - - if (lsr) { - is_undefined = ! lsr->is_defined (); - is_variable = lsr->is_variable (); - } - - symbol_record *sr = 0; - - if (! global && (is_variable || is_undefined)) { - lsr = curr_sym_tab->lookup (name, true); - sr = lsr; - } - else { - lsr = curr_sym_tab->lookup (name, true); - link_to_global_variable (lsr); - sr = lsr; - } - sr->define (tc); - sr->document (doc); - } return retval; } Modified: trunk/octave-forge/main/parallel/src/__bw_psend__.cc =================================================================== --- trunk/octave-forge/main/parallel/src/__bw_psend__.cc 2009-08-19 20:32:55 UTC (rev 6117) +++ trunk/octave-forge/main/parallel/src/__bw_psend__.cc 2009-08-20 13:40:14 UTC (rev 6118) @@ -19,72 +19,31 @@ #include <octave/ls-oct-binary.h> #include <octave/oct-stream.h> -DEFUN_DLD (__bw_psend__, args, , "psend (pd, name[, value])\n\ -Sends variable named in 'name' through pipe stream 'pd'.\n\ -With 'value' given and having boolean value 'true', the\n\ -contents of the second argument itself is sent under the name\n\ -'psend_var'.\n\ +DEFUN_DLD (__bw_psend__, args, , "psend (pd, var)\n\ +The contents of 'var' is sent through the pipe stream 'pd'\n\ +under the name 'psend_var'.\n\ The variable is coded in Octaves binary format,\n\ a header is included. It can be read by 'prcv ()'.\n\ \n\ This function may change and is internal to the parallel package.\n") { - std::string name; - std::string help; - int global; octave_value retval; octave_value tc; - bool contents; - if (args.length () == 2) - contents = false; - else if (args.length () == 3) { - if (! args(2).is_real_scalar ()) { - error ("psend: third variable, if given, must be a real scalar.\n"); - return retval; - } - contents = args(2).scalar_value (); - } else { - error ("psend: two or three arguments required\n"); + if (args.length () != 2) { + error ("__bw_psend__: two arguments required\n"); return retval; } - if (contents) { - name = "psend_var"; - tc = args(1); - help = ""; - global = false; - } - else { - if (args(1).is_string ()) name = args(1).string_value (); - else { - error ("psend: if named variable is to be sent, second argument must be a string\n"); - return retval; - } - symbol_record *var = curr_sym_tab->lookup (name); - if (! var) { - error ("psend: no such variable %s\n", name.c_str ()); - return retval; - } - tc = var->def (); - help = var->help (); - global = var->is_linked_to_global (); - } - if (! tc.is_defined ()) { - // What means this? - error ("psend: variable not defined\n"); - return retval; - } - - octave_stream os = octave_stream_list::lookup (args(0), "psend"); + octave_stream os = octave_stream_list::lookup (args(0), "__bw_psend__"); if (error_state) { - error ("psend: no valid file id\n"); + error ("__bw_psend__: no valid file id\n"); return retval; } if (os.is_open ()) { std::string mode = os.mode_as_string (os.mode ()); if (mode == "r" || mode == "rb") { - error ("psend: stream not writable\n"); + error ("__bw_psend__: stream not writable\n"); return retval; } #ifdef PATCHED_PIPE_CODE_15TH_JUNE_07 @@ -94,20 +53,20 @@ // 98: "b" if (! strchr (mode.c_str (), 98)) { - error ("psend: stream not binary\n"); + error ("__bw_psend__: stream not binary\n"); return retval; } #endif } else { - error ("psend: stream not open\n"); + error ("__bw_psend__: stream not open\n"); return retval; } std::ostream *tps = os.output_stream (); std::ostream& ps = *tps; write_header (ps, LS_BINARY); - save_binary_data (ps, tc, name, help, global, false); + save_binary_data (ps, args(1), "psend_var", "", false, false); return retval; } Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2009-08-19 20:32:55 UTC (rev 6117) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2009-08-20 13:40:14 UTC (rev 6118) @@ -44,6 +44,7 @@ #include "input.h" #include "quit.h" +#include <iostream> #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2010-02-09 10:53:06
|
Revision: 6856 http://octave.svn.sourceforge.net/octave/?rev=6856&view=rev Author: i7tiol Date: 2010-02-09 10:52:58 +0000 (Tue, 09 Feb 2010) Log Message: ----------- parallel/inst/getid.m, scloseall.m: added placeholder for helptext Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/inst/getid.m trunk/octave-forge/main/parallel/inst/scloseall.m Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2010-02-09 10:05:10 UTC (rev 6855) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2010-02-09 10:52:58 UTC (rev 6856) @@ -1,6 +1,6 @@ Name: Parallel Version: 2.0.1 -Date: 2009-08-19 +Date: 2010-02-09 Author: Hayato Fujiwara and Olaf Till <ola...@un...> Maintainer: Hayato Fujiwara and Olaf Till <ola...@un...> Title: Parallel Computing. Modified: trunk/octave-forge/main/parallel/inst/getid.m =================================================================== --- trunk/octave-forge/main/parallel/inst/getid.m 2010-02-09 10:05:10 UTC (rev 6855) +++ trunk/octave-forge/main/parallel/inst/getid.m 2010-02-09 10:52:58 UTC (rev 6856) @@ -15,6 +15,8 @@ function retval = getid (sockets) + ## getid (sockets) + retval = find(sockets==0)(1); endfunction Modified: trunk/octave-forge/main/parallel/inst/scloseall.m =================================================================== --- trunk/octave-forge/main/parallel/inst/scloseall.m 2010-02-09 10:05:10 UTC (rev 6855) +++ trunk/octave-forge/main/parallel/inst/scloseall.m 2010-02-09 10:52:58 UTC (rev 6856) @@ -15,6 +15,8 @@ function scloseall (sockets) + ## scloseall (sockets)x + reval("sclose(sockets);exit;",sockets); sclose(sockets); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2010-08-05 10:35:27
|
Revision: 7496 http://octave.svn.sourceforge.net/octave/?rev=7496&view=rev Author: i7tiol Date: 2010-08-05 10:35:19 +0000 (Thu, 05 Aug 2010) Log Message: ----------- Fixed several bugs. Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2010-08-03 21:29:06 UTC (rev 7495) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2010-08-05 10:35:19 UTC (rev 7496) @@ -1,5 +1,5 @@ Name: Parallel -Version: 2.0.1 +Version: 2.0.2 Date: 2010-02-09 Author: Hayato Fujiwara and Olaf Till <ola...@un...> Maintainer: Hayato Fujiwara and Olaf Till <ola...@un...> Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2010-08-03 21:29:06 UTC (rev 7495) +++ trunk/octave-forge/main/parallel/src/connect.cc 2010-08-05 10:35:19 UTC (rev 7496) @@ -70,10 +70,10 @@ struct hostent *he; octave_value hosts=args(0); charMatrix cm=hosts.char_matrix_value(); - char *host,*pt,myname[16]; + char *host,*pt; // ,myname[16]; errno=0; - gethostname(myname,15); + // gethostname(myname,15); row= cm.rows(); col= cm.columns(); cm= cm.transpose(); @@ -209,17 +209,17 @@ nl=htonl(pid); write_if_no_error(sock,&nl,sizeof(int),error_state); //send name size - strncpy(myname,cm.data(),col); - pt=strchr(myname,' '); + strncpy(host,cm.data(),col); + pt=strchr(host,' '); if(pt==NULL) - myname[col]='\0'; + host[col]='\0'; else *pt='\0'; - len=strlen(myname); + len=strlen(host); nl=htonl(len); write_if_no_error(sock,&nl,sizeof(int),error_state); //send name - write_if_no_error(sock,myname,len+1,error_state); + write_if_no_error(sock,host,len+1,error_state); //recv result code read_if_no_error(sock,&nl,sizeof(int),error_state); result=ntohl(nl); Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-03 21:29:06 UTC (rev 7495) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-05 10:35:19 UTC (rev 7496) @@ -388,11 +388,11 @@ read(asock,&nl,sizeof(int)); pppid=ntohl(nl); sock_v=(int *)calloc((num_nodes+1)*3,sizeof(int)); - host_list=(char **)calloc(num_nodes,sizeof(char *)); + host_list=(char **)calloc(num_nodes+1,sizeof(char *)); for(i=0;i<=num_nodes;i++){ read(asock,&nl,sizeof(int)); len=ntohl(nl); - host_list[i]=(char *)calloc(len,sizeof(char *)); + host_list[i]=(char *)calloc(len,sizeof(char)); read(asock,host_list[i],len); } @@ -423,7 +423,7 @@ ol=sizeof(bufsize); setsockopt(dasock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); - //recv pppid + //recv pppid (of connecting process at master) read(dasock,&nl,sizeof(int)); rpppid=ntohl(nl); //recv name size @@ -445,7 +445,6 @@ //send result code if(result==0){ if(pppid==rpppid){ - result=0; nl=htonl(result); write(dasock,&nl,sizeof(int)); //send endian @@ -458,10 +457,12 @@ #endif write(dasock,&nl,sizeof(int)); //recv endian - read(sock,&nl,sizeof(int)); + read(dasock,&nl,sizeof(int)); sock_v[j+2*(num_nodes+1)]=ntohl(nl); break; - } + } // And else? Shouldn't this test have been made + // before? What is the policy if a different + // process at the master meddles in? }else{ result=-1; nl=htonl(result); @@ -513,8 +514,8 @@ setsockopt(dsock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); //send pppid + nl=htonl(pppid); write(dsock,&nl,sizeof(int)); - pppid=ntohl(nl); //send name size len=strlen(host_list[me]); nl=htonl(len); @@ -528,7 +529,7 @@ if(result==0){ sock_v[i]=dsock; //recv endian - read(sock,&nl,sizeof(int)); + read(dsock,&nl,sizeof(int)); sock_v[i+2*(num_nodes+1)]=ntohl(nl); //send endian #if defined (__BYTE_ORDER) @@ -538,7 +539,7 @@ #else # error "can not determine the byte order" #endif - write(sock,&nl,sizeof(int)); + write(dsock,&nl,sizeof(int)); break; }else{ close(dsock); @@ -546,11 +547,11 @@ } free(addr); } - /* for(i=0;i<=num_nodes;i++){ + for(i=0;i<=num_nodes;i++){ free(host_list[i]); } free(host_list); - */ + //normal act install_signal_handlers (); @@ -563,10 +564,12 @@ sprintf(s,"sockets=zeros(%d,3)",num_nodes+1); eval_string(std::string(s),true,stat); for(i=0;i<=num_nodes;i++){ - sprintf(s,"sockets(i+1,:)=[%d,0,%d]",sock_v[i],sock_v[i+2*(num_nodes+1)]); + sprintf(s,"sockets(%d,:)=[%d,0,%d]",i+1,sock_v[i],sock_v[i+2*(num_nodes+1)]); eval_string(std::string(s),true,stat); } + free(sock_v); + interactive = false; line_editing = false; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2010-08-13 13:27:43
|
Revision: 7526 http://octave.svn.sourceforge.net/octave/?rev=7526&view=rev Author: i7tiol Date: 2010-08-13 13:27:35 +0000 (Fri, 13 Aug 2010) Log Message: ----------- Original 'parallel' package now uses Octaves save and load functionality and got a select_sockets function. - "send", "recv": replaced with code that uses Octaves save and load functionality. - "select_sockets": new function, wrapper to "select". Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/INDEX trunk/octave-forge/main/parallel/doc/README trunk/octave-forge/main/parallel/doc/README.parallel trunk/octave-forge/main/parallel/src/Makefile trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc trunk/octave-forge/main/parallel/src/recv.cc trunk/octave-forge/main/parallel/src/sclose.cc trunk/octave-forge/main/parallel/src/send.cc Added Paths: ----------- trunk/octave-forge/main/parallel/inst/select_sockets.m trunk/octave-forge/main/parallel/src/sock-stream.cc trunk/octave-forge/main/parallel/src/sock-stream.h Removed Paths: ------------- trunk/octave-forge/main/parallel/src/select_sockets.cc Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2010-08-13 13:27:35 UTC (rev 7526) @@ -1,10 +1,11 @@ Name: Parallel -Version: 2.0.2 -Date: 2010-02-09 +Version: 2.0.3 +Date: 2010-08-13 Author: Hayato Fujiwara and Olaf Till <ola...@un...> -Maintainer: Hayato Fujiwara and Olaf Till <ola...@un...> +Maintainer: Olaf Till <ola...@un...> Title: Parallel Computing. -Description: Parallel execution package for cluster computers. For +Description: Parallel execution package for cluster computers. See + also package openmpi_ext, maintained by Riccardo Corradini. For parallel execution on a single machine see e.g. function parcellfun (author: Jaroslav Hajek) in package general. Depends: octave (>= 3.0.0) Modified: trunk/octave-forge/main/parallel/INDEX =================================================================== --- trunk/octave-forge/main/parallel/INDEX 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/INDEX 2010-08-13 13:27:35 UTC (rev 7526) @@ -5,6 +5,7 @@ connect sclose scloseall + select_sockets Internal __bw_lock_file__ __bw_is_locked__ Modified: trunk/octave-forge/main/parallel/doc/README =================================================================== --- trunk/octave-forge/main/parallel/doc/README 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/doc/README 2010-08-13 13:27:35 UTC (rev 7526) @@ -9,9 +9,10 @@ function. Please see "README.bw". As yet, the package only contains functions for parallel execution on -a cluster of machines. For parallel execution on a single machine, see +a cluster of machines. See also package openmpi_ext, maintained by +Riccardo Corradini. For parallel execution on a single machine, see for example "parcellfun" (author: Jaroslav Hajek) in package "general". -Olaf Till <ola...@un...>, 2009-03-27 +Olaf Till <ola...@un...>, 2010-08-11 Modified: trunk/octave-forge/main/parallel/doc/README.parallel =================================================================== --- trunk/octave-forge/main/parallel/doc/README.parallel 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/doc/README.parallel 2010-08-13 13:27:35 UTC (rev 7526) @@ -13,11 +13,18 @@ Requirements - 1. GNU Octave 2.1.43 that is available at http://www.octave.org/ + 1. GNU Octave >= 3.0.0 that is available at http://www.octave.org/ 2. two or more computers +Security note + Commands are sent to slave machines over TCP connections to port + 12502, data is sent between machines over TCP connections to port + 12501. With the current version, it is your own responsibility to + secure these ports against unauthorized access. + + How to use Note first that this package assumes the use in a multiple-computer system consisting of a master (your console) computer and some @@ -89,8 +96,10 @@ sclose (sockets); +select_sockets: [N, IDX] = select_sockets (SOCKETS, TIMEOUT[, NFDS]) + See help-text of this function. Returns an index to rows in "sockets" + with pending input from slave machines. Useful for advanced usage. - Notes for the current version: * The slave computer must have the directory whose name and path are identical with the current directory of the master computer. @@ -120,7 +129,7 @@ hosts = [ "host1"; "host2"; "host3"; "host4" ]; sockets = connect(hosts); s="Hello, again!"; -send(s,sockets(2,:)); +send(s,sockets(2,:)); # for larger data than contained in "s", this command should be given _after_ the following "reval" commands reval( "s=recv(sockets(1,:));",sockets(2,:)); reval( "send(s,sockets(3,:));",sockets(2,:)); reval( "s=recv(sockets(2,:));",sockets(3,:)); @@ -135,11 +144,11 @@ License: This package is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2, or (at your option) - any later version. + the Free Software Foundation. See individual files for GPL Version. Comments and suggestions should be directed to: h_f...@us... +or to the current package maintainer. \ No newline at end of file Added: trunk/octave-forge/main/parallel/inst/select_sockets.m =================================================================== --- trunk/octave-forge/main/parallel/inst/select_sockets.m (rev 0) +++ trunk/octave-forge/main/parallel/inst/select_sockets.m 2010-08-13 13:27:35 UTC (rev 7526) @@ -0,0 +1,46 @@ +## Copyright (C) 2010 Olaf Till <ola...@un...> +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, write to the Free Software +## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +## -*- texinfo -*- +## @deftypefn {Function File} {[@var{n}, @var{idx}] =} select_sockets +## (@var{sockets}, @var{timeout}[, @var{nfds}])\n\ +## Calls Unix @code{select}.\n\ +## @var{sockets}: valid sockets matrix as returned by @code{connect}.\n\ +## @var{timeout}: seconds, negative for infinite.\n\ +## @var{nfds}: optional, default is Unix' FD_SETSIZE (platform specific).\n\ +## Passed to Unix @code{select} as the first argument --- see there.\n\ +## An error is returned if nfds or a watched filedescriptor \n\ +## plus one exceeds FD_SETSIZE.\n\ +## Return values are:\n\ +## @var{idx}: index vector to rows in @var{sockets} with pending input\n\ +## readable with @code{recv}.\n\ +## @var{n}: number of rows in @var{sockets} with pending input.\n\ +## @end deftypefn + +function [n, ridx] = select_sockets (varargin) + + if ((nargin = columns (varargin)) < 2 || nargin > 3) + error ("two or three arguments required"); + endif + + if (! ismatrix (sockets = varargin{1}) || rows (sockets < 1) || \ + colums (sockets) != 3) + error ("no valid sockets matrix"); + endif + + [n, ridx] = \ + select (cat \ + (2, {varargin{1}(:, 1)}, {[], []}, varargin(2:end)){:}); Modified: trunk/octave-forge/main/parallel/src/Makefile =================================================================== --- trunk/octave-forge/main/parallel/src/Makefile 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/Makefile 2010-08-13 13:27:35 UTC (rev 7526) @@ -1,5 +1,5 @@ OCTS = sclose.oct connect.oct pserver.oct \ - recv.oct reval.oct send.oct select_sockets.oct \ + recv.oct reval.oct send.oct \ __bw_is_locked__.oct __bw_lock_file__.oct \ __bw_unlock_file__.oct \ __bw_prcv__.oct __bw_psend__.oct __internal_exit__.oct \ @@ -9,6 +9,6 @@ all: $(OCTS) %.oct: %.cc - mkoctfile -s $< + mkoctfile -s $< sock-stream.cc clean: ; rm *.o *.oct Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/connect.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -17,6 +17,8 @@ */ +// TODO: error handling is a mess + #include <octave/oct.h> #include "defun-dld.h" @@ -39,6 +41,8 @@ #include <unistd.h> #include <netinet/in.h> +#include "sock-stream.h" + #define BUFF_SIZE SSIZE_MAX // COMM @@ -60,8 +64,8 @@ \n\ Connect hosts and return sockets.") { - - int sock=0,col=0,row=0,i,j,len; + octave_value retval; + int sock=0,col=0,row=0,i,j,len, not_connected; double *sock_v=0; if (args.length () == 1) { @@ -105,8 +109,10 @@ } memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length); + not_connected = 1; for(j=0;j<10;j++){ - if(connect(sock,(struct sockaddr *)addr,sizeof(*addr))==0){ + if((not_connected = + connect(sock,(struct sockaddr *)addr,sizeof(*addr)))==0){ break; }else if(errno!=ECONNREFUSED){ error("connect error "); @@ -114,43 +120,48 @@ usleep(5000); } } - if(!sock) - error("Unable to connect to %s: Connection refused",host); - - sock_v[i+row]=sock; - + free(addr); free(host); - int num_nodes=row-1; + if(not_connected) - pid=getpid(); - nl=htonl(num_nodes); - write_if_no_error(sock,&nl,sizeof(int),error_state); - nl=htonl(i); - write_if_no_error(sock,&nl,sizeof(int),error_state); - nl=htonl(pid); - write_if_no_error(sock,&nl,sizeof(int),error_state); - host=(char *)calloc(128,sizeof(char)); - for(j=0;j<row;j++){ - strncpy(host,&cm.data()[col*j],col); - pt=strchr(host,' '); - if(pt==NULL) - host[col]='\0'; - else - *pt='\0'; - len=strlen(host)+1; - nl=htonl(len); - write_if_no_error(sock,&nl,sizeof(int),error_state); - write_if_no_error(sock,host,len,error_state); - } - free(host); - int comm_len; - std::string directory = octave_env::getcwd (); - comm_len=directory.length(); - nl=htonl(comm_len); - write_if_no_error(sock,&nl,sizeof(int),error_state); - write_if_no_error(sock,directory.c_str(),comm_len,error_state); + error("Unable to connect to %s: Connection refused",host); + + else + { + sock_v[i+row]=sock; + + int num_nodes=row-1; + + pid=getpid(); + nl=htonl(num_nodes); + write_if_no_error(sock,&nl,sizeof(int),error_state); + nl=htonl(i); + write_if_no_error(sock,&nl,sizeof(int),error_state); + nl=htonl(pid); + write_if_no_error(sock,&nl,sizeof(int),error_state); + host=(char *)calloc(128,sizeof(char)); + for(j=0;j<row;j++){ + strncpy(host,&cm.data()[col*j],col); + pt=strchr(host,' '); + if(pt==NULL) + host[col]='\0'; + else + *pt='\0'; + len=strlen(host)+1; + nl=htonl(len); + write_if_no_error(sock,&nl,sizeof(int),error_state); + write_if_no_error(sock,host,len,error_state); + } + free(host); + int comm_len; + std::string directory = octave_env::getcwd (); + comm_len=directory.length(); + nl=htonl(comm_len); + write_if_no_error(sock,&nl,sizeof(int),error_state); + write_if_no_error(sock,directory.c_str(),comm_len,error_state); + } } usleep(100); @@ -180,71 +191,78 @@ error("Unknown host %s",host); } memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length); - while(1){ - for(j=0;j<10;j++){ - if(connect(sock,(struct sockaddr *)addr,sizeof(*addr))==0){ - break; - }else if(errno!=ECONNREFUSED){ - perror("connect : "); - exit(-1); - }else { - usleep(5000); - } + + not_connected = 1; + for(j=0;j<10;j++){ + if((not_connected = + connect(sock,(struct sockaddr *)addr,sizeof(*addr)))==0){ + break; + }else if(errno!=ECONNREFUSED){ + perror("connect error "); + }else { + usleep(5000); } - if(!sock) - error("Unable to connect to %s: Connection refused",host); - - int bufsize=262144; - socklen_t ol; - ol=sizeof(bufsize); - setsockopt(sock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol); - setsockopt(sock,SOL_SOCKET,SO_RCVBUF,&bufsize,ol); - bufsize=1; - ol=sizeof(bufsize); - setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); - + } + if(not_connected) - int len=0,result=0;; - //send pppid - nl=htonl(pid); - write_if_no_error(sock,&nl,sizeof(int),error_state); - //send name size - strncpy(host,cm.data(),col); - pt=strchr(host,' '); - if(pt==NULL) - host[col]='\0'; - else - *pt='\0'; - len=strlen(host); - nl=htonl(len); - write_if_no_error(sock,&nl,sizeof(int),error_state); - //send name - write_if_no_error(sock,host,len+1,error_state); - //recv result code - read_if_no_error(sock,&nl,sizeof(int),error_state); - result=ntohl(nl); - if(result==0){ - sock_v[i]=sock; - //recv endian + error("Unable to connect to %s: Connection refused",host); + + else + { + int bufsize=262144; + socklen_t ol; + ol=sizeof(bufsize); + setsockopt(sock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol); + setsockopt(sock,SOL_SOCKET,SO_RCVBUF,&bufsize,ol); + bufsize=1; + ol=sizeof(bufsize); + setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); + + int len=0,result=0;; + //send pppid + nl=htonl(pid); + write_if_no_error(sock,&nl,sizeof(int),error_state); + //send name size + strncpy(host,cm.data(),col); + pt=strchr(host,' '); + if(pt==NULL) + host[col]='\0'; + else + *pt='\0'; + len=strlen(host); + nl=htonl(len); + write_if_no_error(sock,&nl,sizeof(int),error_state); + //send name + write_if_no_error(sock,host,len+1,error_state); + //recv result code read_if_no_error(sock,&nl,sizeof(int),error_state); - sock_v[i+2*row]=ntohl(nl); - //send endian + result=ntohl(nl); + if(result==0){ + sock_v[i]=sock; + //recv endian + read_if_no_error(sock,&nl,sizeof(int),error_state); + sock_v[i+2*row]=ntohl(nl); + //send endian #if defined (__BYTE_ORDER) - nl=htonl(__BYTE_ORDER); + nl=htonl(__BYTE_ORDER); #elif defined (BYTE_ORDER) - nl=htonl(BYTE_ORDER); + nl=htonl(BYTE_ORDER); #else # error "can not determine the byte order" #endif - write_if_no_error(sock,&nl,sizeof(int),error_state); - break; - }else{ - close(sock); + write_if_no_error(sock,&nl,sizeof(int),error_state); + socket_to_oct_iostream (sock); + + }else{ + close(sock); + } } - } - + free(addr); free(host); + + if (error_state) + return retval; } char lf='\n'; @@ -256,16 +274,14 @@ else { print_usage (); - octave_value retval; return retval; } - Matrix mx(row,3); double *tmp =mx.fortran_vec(); for (i=0;i<3*row;i++) tmp[i]=sock_v[i]; - octave_value retval(mx); + retval = octave_value (mx); return retval; } Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -59,6 +59,8 @@ #include <setjmp.h> #include <netinet/in.h> +#include "sock-stream.h" + // SSIZE_MAX might be for 64-bit. Limit to 2^31-1 #define BUFF_SIZE 2147483647 @@ -459,6 +461,7 @@ //recv endian read(dasock,&nl,sizeof(int)); sock_v[j+2*(num_nodes+1)]=ntohl(nl); + socket_to_oct_iostream (dasock); break; } // And else? Shouldn't this test have been made // before? What is the policy if a different @@ -471,6 +474,8 @@ sleep(1); } } + if (error_state) + _exit (-1); } // close(dsock); //me @@ -540,12 +545,15 @@ # error "can not determine the byte order" #endif write(dsock,&nl,sizeof(int)); + socket_to_oct_iostream (dsock); break; }else{ close(dsock); } } free(addr); + if (error_state) + _exit (-1); } for(i=0;i<=num_nodes;i++){ free(host_list[i]); Modified: trunk/octave-forge/main/parallel/src/recv.cc =================================================================== --- trunk/octave-forge/main/parallel/src/recv.cc 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/recv.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -1,278 +1,90 @@ -/* +// Copyright (C) 2010 Olaf Till <ola...@un...> -Copyright (C) 2002 Hayato Fujiwara +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 3 of the License, or +// (at your option) any later version. -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; either version 2 of the License, or -(at your option) any later version. +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -You should have received a copy of the GNU General Public License -along with this program; If not, see <http://www.gnu.org/licenses/>. - -*/ - #include <octave/oct.h> +#include <octave/load-save.h> +#include <octave/ls-oct-binary.h> +#include <octave/oct-stream.h> +#include <octave/oct-map.h> -#include "defun-dld.h" -#include "dirfns.h" -#include "error.h" -#include "help.h" -#include "oct-map.h" -#include "systime.h" -#include "ov.h" -#include "oct-obj.h" -#include "utils.h" -#include "oct-env.h" +DEFUN_DLD (recv, args, nargout, "recv (socket)\n\ +\n\ +Receive a variable from the computer specified by the row vector 'socket'.\n") +{ + octave_value retval; -#include <stdio.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/poll.h> -#include <netinet/in.h> -#include <errno.h> -#include <netdb.h> -#include <unistd.h> + if (args.length () != 1) + { + error ("exactly one argument required\n"); + return retval; + } -#include "swab.h" + Matrix socket = args(0).matrix_value (); -// SSIZE_MAX might be for 64-bit. Limit to 2^31-1 -#define BUFF_SIZE 2147483647 + if (error_state) + return retval; -// COMM + octave_stream is = octave_stream_list::lookup + (octave_value (socket(0, 0)), "recv"); -DEFUN_DLD (recv, args, , - "recv (socket)\n\ -\n\ -Receive a variable from the computer specified by the row vector 'socket'.") -{ - octave_value retval; - int type_id=0,sock; - - if(args.length () == 1) + if (error_state) return retval; + + if (! is.is_open ()) { - Matrix m=args(0).matrix_value(); - struct pollfd *pollfd,*pollfd_d; - int num,error_code,sock_c,num_d,nl,endian; - - sock=(int) m.data()[0]; - sock_c=(int) m.data()[1]; - endian=(int) m.data()[2]; + error ("stream not open\n"); + return retval; + } - pollfd=(struct pollfd *)malloc(sizeof(struct pollfd)); - pollfd_d=(struct pollfd *)malloc(sizeof(struct pollfd)); - pollfd[0].fd=sock_c; - pollfd_d[0].fd=sock; - pollfd[0].events=0; - pollfd[0].events=POLLIN|POLLERR|POLLHUP; - pollfd_d[0].events=pollfd[0].events; - pollfd_d[0].revents=0; + std::istream *tps = is.input_stream (); + std::istream& ps = *tps; - num_d=0; - while(!num_d){ - if(sock_c){ - int pid; - pollfd[0].revents=0; - num=poll(pollfd,1,0); - if(num){ - if(pollfd[0].revents && (pollfd[0].fd !=0)){ - sockaddr_in r_addr; - struct hostent *hehe; - socklen_t len = sizeof(r_addr); - getpeername(pollfd[0].fd, (sockaddr*)&r_addr, &len ); - hehe=gethostbyaddr((char *)&r_addr.sin_addr.s_addr,sizeof(r_addr.sin_addr), AF_INET); - if(pollfd[0].revents&POLLIN){ - pid=getpid(); - read(pollfd[0].fd,&nl,sizeof(int)); - error_code=ntohl(nl); - write(pollfd[0].fd,&nl,sizeof(int)); - error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); - } - if(pollfd[0].revents&POLLERR){ - error("Error condition - %s",hehe->h_name ); - } - if(pollfd[0].revents&POLLHUP){ - error("Hung up - %s",hehe->h_name ); - } - } - } - } - num_d=poll(pollfd_d,1,1000); - } - if(pollfd_d[0].revents && (pollfd_d[0].fd !=0)){ - if(pollfd_d[0].revents&POLLIN){ - read(sock,&nl,sizeof(int)); - type_id=ntohl(nl); - } - if(pollfd_d[0].revents&POLLERR){ - error("Error condition "); - } - if(pollfd_d[0].revents&POLLHUP){ - error("Hung up " ); - } - } + bool global = false; + octave_value tc; + std::string name; + std::string doc; + bool swap; + oct_mach_info::float_format flt_fmt; - if(type_id==3) - { - int col=0,row=0,length=0,count=0,r_len=0; - unsigned long long int *conv; - double *p; - read(sock,&nl,sizeof(int)); - row=ntohl(nl); - read(sock,&nl,sizeof(int)); - col=ntohl(nl); - length=sizeof(double)*row*col; - Matrix m(row,col); - double* tmp = m.fortran_vec(); - errno=0; - r_len=BUFF_SIZE; - while(count <length){ - p=tmp+(count/sizeof(double)); - if((length-count) < BUFF_SIZE) - r_len=length-count; - count +=read(sock,p,r_len); -#if defined (__BYTE_ORDER) - if(endian != __BYTE_ORDER){ -#elif defined (BYTE_ORDER) - if(endian != BYTE_ORDER){ -#else -# error "can not determine the byte order" -#endif - conv=(unsigned long long int *)((unsigned long long int)p & 0xfffffff8ULL); - for(int i=0;i<count/8;i++) - *conv++=swab64(conv); - } - } - retval= octave_value(m); - } - else if(type_id==1) - { - double d; - unsigned long long int *conv; - read(sock,&d,sizeof(d)); -#if defined (__BYTE_ORDER) - if(endian != __BYTE_ORDER){ -#elif defined (BYTE_ORDER) - if(endian != BYTE_ORDER){ -#else -# error "can not determine the byte order" -#endif - conv=(unsigned long long int *)&d; - *conv=swab64(conv); - } - retval= octave_value(d); - } - else if(type_id==4) - { - int col=0,row=0,length=0,count=0,r_len=0; - Complex *p; - unsigned long long int *conv; - read(sock,&nl,sizeof(int)); - row=ntohl(nl); - read(sock,&nl,sizeof(int)); - col=ntohl(nl); - length=sizeof(Complex)*row*col; - ComplexMatrix cm(row,col); - Complex* tmp = cm.fortran_vec(); - errno=0; - r_len=BUFF_SIZE; - while(count <length){ - p=tmp+(count/sizeof(Complex)); - if((length-count) < BUFF_SIZE) - r_len=length-count; - count +=read(sock,p,r_len); -#if defined (__BYTE_ORDER) - if(endian != __BYTE_ORDER){ -#elif defined (BYTE_ORDER) - if(endian != BYTE_ORDER){ -#else -# error "can not determine the byte order" -#endif - conv=(unsigned long long int *)((unsigned long long int)p & 0xfffffff8ULL); - for(int i=0;i<count/8;i++){ - *conv++=swab64(conv); - } - } - } - retval= octave_value(cm); - - } - else if(type_id==2) - { - Complex cx; - - read(sock,&cx,sizeof(cx)); -#if defined (__BYTE_ORDER) - if(endian != __BYTE_ORDER){ -#elif defined (BYTE_ORDER) - if(endian != BYTE_ORDER){ -#else -# error "can not determine the byte order" -#endif - long long int *conv; - conv=(long long int *)&cx; - *conv++=swab64(conv); - *conv=swab64(conv); - } - retval= octave_value(cx); - - } - else if(type_id==6) - { - int col=0,row=0,length=0,count=0,r_len=0; - - read(sock,&nl,sizeof(int)); - row=ntohl(nl); - read(sock,&nl,sizeof(int)); - col=ntohl(nl); - length=sizeof(char)*row*col; - charMatrix cmx(row,col); - char* str = cmx.fortran_vec(); - errno=0; - r_len=BUFF_SIZE; - while(count <length){ - if((length-count) < BUFF_SIZE) - r_len=length-count; - count +=read(sock,(str+count),r_len); - } - retval= octave_value(cmx,1); - - } - else if(type_id==7) - { - int i,length=0,key_len=0; - Octave_map m; - octave_value_list ov_list; - char *key; - - read(sock,&nl,sizeof(nl)); - length=ntohl(nl); - for(i=0;i<length;i++){ - read(sock,&nl,sizeof(int)); - key_len=ntohl(nl); - key = (char *)calloc(sizeof(char),key_len+1); - read(sock,key,key_len); - ov_list=Frecv(args(0),0); - m.assign(key,ov_list(0)); - } - retval=octave_value(m); - } - } - else - print_usage (); + // The next two functions called pretend to have been called + // from 'load' in their error messages, read_binary_data also + // wants to have the filename ... - return retval; + if (read_binary_file_header (ps, swap, flt_fmt, false) < 0) + return retval; -} + name = read_binary_data (ps, swap, flt_fmt, "", global, tc, doc); + // read_binary_data will give no error with EOF at start + // of reading, but in this case it is an error, since + // after the header exactly one variable is expected. This + // is mended by asking for EOF here. -/* -;;; Local Variables: *** -;;; mode: C++ *** -;;; End: *** -*/ + if (ps.eof () || error_state || name.empty ()) + { + error ("error in reading variable data\n"); + return retval; + } + + if (! tc.is_defined ()) + { + // What means this? + error ("error in reading variable\n"); + return retval; + } + + + return retval = tc; +} Modified: trunk/octave-forge/main/parallel/src/sclose.cc =================================================================== --- trunk/octave-forge/main/parallel/src/sclose.cc 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/sclose.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -18,6 +18,7 @@ */ #include <octave/oct.h> +#include <oct-stream.h> #include "defun-dld.h" #include "dirfns.h" @@ -53,9 +54,10 @@ if(args.length () == 1) { - int i,nsock=0,sock,k,err=0,nl; + int i,nsock=0,sock,k,err=0,nl, rows; - nsock=args(0).matrix_value().rows()*2; + rows = args(0).matrix_value().rows(); + nsock = rows * 2; if((int)args(0).matrix_value().data()[0]==0){ int num,pid; @@ -99,13 +101,20 @@ } } - for(i=nsock-1;i>=0;i--){ + for(i=nsock-1;i>=rows;i--){ sock=(int)args(0).matrix_value().data()[i]; if(sock!=0){ if(close(sock)!=0) err++; } } + for(i=rows-1;i>=0;i--){ + sock=(int)args(0).matrix_value().data()[i]; + if(sock!=0){ + if(octave_stream_list::remove (octave_value (sock), "") != 0) + err++; + } + } if(err) error("sclose error %d",err); retval=(double)err; Deleted: trunk/octave-forge/main/parallel/src/select_sockets.cc =================================================================== --- trunk/octave-forge/main/parallel/src/select_sockets.cc 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/select_sockets.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -1,140 +0,0 @@ -// Copyright (C) 2010 Olaf Till <ola...@un...> - -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation; either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -#include <octave/oct.h> -#include <octave/oct-stream.h> - -#ifdef POSIX -#include <sys/select.h> -#else -#include <sys/time.h> -#include <sys/types.h> -#include <unistd.h> -#endif -#include <errno.h> -#include <map> - -DEFUN_DLD (select_sockets, args, nargout, - "-*- texinfo -*-\n\ -@deftypefn {Loadable Function} {[@var{n}, @var{idx}] =} select_sockets (@var{sockets}, @var{timeout}[, @var{nfds}])\n\ -Calls Unix @code{select}.\n\ -@var{sockets}: valid sockets matrix as returned by @code{connect}.\n\ -@var{timeout}: seconds, negative for infinite.\n\ -@var{nfds}: optional, default is Unix' FD_SETSIZE (platform specific).\n\ -Passed to Unix @code{select} as the first argument --- see there.\n\ -An error is returned if nfds or a watched filedescriptor \n\ -plus one exceeds FD_SETSIZE.\n\ -Return values are:\n\ -@var{idx}: index vector to rows in @var{sockets} with pending input\n\ -readable with @code{recv}.\n\ -@var{n}: number of rows in @var{sockets} with pending input.\n\ -@end deftypefn") -{ - octave_value_list retval; - int nargin = args.length (); - int i, fid, nfds, n, nr, act; - double argtout, *fvec; - timeval tout; - timeval *timeout = &tout; - ColumnVector read_fds; - - if (nargin == 2) - nfds = FD_SETSIZE; - else if (nargin == 3) { - if (! args(2).is_real_scalar ()) { - error ("'nfds' must be a real scalar.\n"); - return retval; - } - nfds = args(2).int_value (); - if (nfds <= 0) { - error ("'nfds' should be greater than zero.\n"); - return retval; - } - if (nfds > FD_SETSIZE) { - error ("'nfds' exceeds systems maximum given by FD_SETSIZE.\n"); - return retval; - } - } - else { - error ("two or three arguments required.\n"); - return retval; - } - if (! args(1).is_real_scalar ()) { - error ("'timeout' must be a real scalar.\n"); - return retval; - } - if ((argtout = args(1).double_value ()) < 0) - timeout = NULL; - else { - double ipart, fpart; - fpart = modf (argtout, &ipart); - tout.tv_sec = lrint (ipart); - tout.tv_usec = lrint (fpart * 1000); - } - if ((nr = args(0).matrix_value().rows()) < 2 || - args(0).matrix_value().columns() != 3) { - error ("First argument must be a valid sockets matrix as returned by 'connect'\n"); - return retval; - } - read_fds = ColumnVector (args(0).matrix_value().column(0)); - - fd_set rfds; - FD_ZERO (&rfds); - for (i = 1; i < read_fds.length (); i++) { - fid = lrint (read_fds(i)); - if (fid >= FD_SETSIZE) { - error ("filedescriptor >= FD_SETSIZE"); - return retval; - } - FD_SET (fid, &rfds); - } - - if ((n = select (nfds, &rfds, NULL, NULL, timeout)) == -1) { - std::string err; - switch (errno) { - case EBADF: - err = "EBADF"; - break; - case EINTR: - err = "EINTR"; - break; - case EINVAL: - err = "EINVAL"; - break; - default: - err = "unknown error"; - } - error ("unix select returned error: %s\n", - err.c_str ()); - return retval; - } - if (nargout > 1) { - for (i = 1, act = 0; i < read_fds.length (); i++) - if (FD_ISSET (lrint (read_fds(i)), &rfds)) act++; - RowVector ridx = RowVector (act); - for (i = 1, fvec = ridx.fortran_vec (); - i < read_fds.length (); i++) - if (FD_ISSET (lrint (read_fds(i)), &rfds)) { - *fvec = double (i + 1); - fvec++; - } - retval(1) = ridx; - } - - retval(0) = n; - - return retval; -} Modified: trunk/octave-forge/main/parallel/src/send.cc =================================================================== --- trunk/octave-forge/main/parallel/src/send.cc 2010-08-13 12:02:38 UTC (rev 7525) +++ trunk/octave-forge/main/parallel/src/send.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -1,270 +1,76 @@ -/* +// Copyright (C) 2010 Olaf Till <ola...@un...> -Copyright (C) 2002 Hayato Fujiwara +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 3 of the License, or +// (at your option) any later version. -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; either version 2 of the License, or -(at your option) any later version. +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -You should have received a copy of the GNU General Public License -along with this program; If not, see <http://www.gnu.org/licenses/>. +#include <octave/oct.h> +#include <octave/load-save.h> +#include <octave/ls-oct-binary.h> +#include <octave/oct-stream.h> -*/ +DEFUN_DLD (send, args, , "send (X, sockets)\n\ +\n\ +Send the variable 'X' to the computers specified by matrix 'sockets'\n.") +{ + octave_value retval; -#include <octave/oct.h> -#include <map> + if (args.length () != 2) + { + error ("two arguments required\n"); -#include "defun-dld.h" -#include "dirfns.h" -#include "error.h" -#include "help.h" -#include "oct-map.h" -#include "systime.h" -#include "ov.h" -#include "oct-obj.h" -#include "utils.h" -#include "oct-env.h" + return retval; + } -#include <stdio.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/poll.h> -#include <netinet/in.h> -#include <errno.h> -#include <netdb.h> -#include <unistd.h> + Matrix sockets = args(1).matrix_value (); -// SSIZE_MAX might be for 64-bit. Limit to 2^31-1 -#define BUFF_SIZE 2147483647 + if (error_state) + return retval; -// COMM + int rows = sockets.rows (); -DEFUN_DLD (send, args, , - "send (X,sockets)\n\ -\n\ -Send the variable 'x' to the computers specified by matrix 'sockets'.") -{ - octave_value retval; - - typedef std::map<std::string, octave_value_list>::iterator iterator; - typedef std::map<std::string, octave_value_list>::const_iterator const_iterator; + double sid; - if(args.length () ==2) + for (int id = 0; id < rows; id++) { - octave_value val=args(0); - Matrix sock_m=args(1).matrix_value(); - int sock,i,k,error_code,nl; - int nsock=sock_m.rows(); - int type_id=0; //=val.type_id(); + if ((sid = sockets(id, 0)) != 0) + { + octave_stream os = octave_stream_list::lookup + (octave_value (sid), "send"); - if((int)sock_m.data()[0]==0){ - int num,pid; - struct pollfd *pollfd; - pollfd=(struct pollfd *)malloc(nsock*sizeof(struct pollfd)); - for(i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i+nsock]; - pollfd[i].fd=sock; - pollfd[i].events=0; - pollfd[i].events=POLLIN|POLLERR|POLLHUP; - } - - num=poll(pollfd,nsock,0); - if(num){ - for(k=0;k<nsock;k++){ - if(pollfd[k].revents && (pollfd[k].fd !=0)){ - sockaddr_in r_addr; - struct hostent *hehe; - socklen_t len = sizeof(r_addr); - getpeername(pollfd[k].fd, (sockaddr*)&r_addr, &len ); - hehe=gethostbyaddr((char *)&r_addr.sin_addr.s_addr,sizeof(r_addr.sin_addr), AF_INET); - - if(pollfd[k].revents&POLLIN){ - pid=getpid(); - read(pollfd[k].fd,&nl,sizeof(int)); - error_code=ntohl(nl); - write(pollfd[k].fd,&nl,sizeof(int)); - error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); - } - if(pollfd[k].revents&POLLERR){ - error("Error condition - %s",hehe->h_name ); - } - if(pollfd[k].revents&POLLHUP){ - error("Hung up - %s",hehe->h_name ); - } + if (error_state) + { + error ("no valid stream id\n"); + + return retval; } - } - } - } + if (! os.is_open ()) + { + error ("stream not open\n"); - if(val.is_real_matrix() && !(val.is_char_matrix())) - { - Matrix m=val.matrix_value(); - int row=m.rows(); - int col=m.columns(); - int length=0,count,r_len; - - const double *tmp=m.data(); - type_id=3; - for (i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i]; - if(sock!=0){ - nl=htonl(type_id); - write(sock,&nl,sizeof(int)); - nl=htonl(row); - write(sock,&nl,sizeof(int)); - nl=htonl(col); - write(sock,&nl,sizeof(int)); - length=sizeof(double)*col*row; - errno=0; - count=0; - r_len=BUFF_SIZE; - while(count <length){ - if((length-count) < BUFF_SIZE) - r_len=length-count; - count +=write(sock,(tmp+(count/sizeof(double))),r_len); - } - - // write(sock,m.data(),length); + return retval; } - } - + + std::ostream *tps = os.output_stream (); + std::ostream& ps = *tps; + + write_header (ps, LS_BINARY); + + save_binary_data (ps, args(0), "a", "", false, false); + + ps.flush (); } - else if(val.is_real_scalar()) - { - double d=val.double_value(); - int length=sizeof(d); - - type_id=1; - for (i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i]; - if(sock!=0){ - nl=htonl(type_id); - write(sock,&nl,sizeof(int)); - write(sock,&d,length); - } - } - } - else if(val.is_complex_matrix()) - { - ComplexMatrix m=val.complex_matrix_value(); - int row=m.rows(); - int col=m.columns(); - int length=0,count,r_len; - - type_id=4; - const Complex *tmp=m.data(); - for (i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i]; - if(sock!=0){ - nl=htonl(type_id); - write(sock,&nl,sizeof(int)); - nl=htonl(row); - write(sock,&nl,sizeof(int)); - nl=htonl(col); - write(sock,&nl,sizeof(int)); - length=sizeof(Complex)*col*row; - count=0; - r_len=BUFF_SIZE; - while(count <length){ - if((length-count) < BUFF_SIZE) - r_len=length-count; - count +=write(sock,(tmp+(count/sizeof(Complex))),r_len); - } - // write(sock,m.data(),length); - } - } - } - else if(val.is_complex_scalar()) - { - Complex cx=val.complex_value(); - int length=sizeof(cx); - - type_id=2; - for (i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i]; - if(sock!=0){ - nl=htonl(type_id); - write(sock,&nl,sizeof(int)); - write(sock,&cx,length); - } - } - } - else if(val.is_char_matrix()) - { - int row=val.rows(); - int col=val.columns(); - int length=sizeof(char)*row*col,count,r_len; - charMatrix cmx=val.char_matrix_value(); - - type_id=6; - const char *tmp=cmx.data(); - for (i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i]; - if(sock!=0){ - nl=htonl(type_id); - write(sock,&nl,sizeof(int)); - nl=htonl(row); - write(sock,&nl,sizeof(int)); - nl=htonl(col); - write(sock,&nl,sizeof(int)); - count=0; - r_len=BUFF_SIZE; - while(count <length){ - if((length-count) < BUFF_SIZE) - r_len=length-count; - count +=write(sock,(tmp+(count/sizeof(char))),r_len); - } - // write(sock,cmx.data(),length); - } - } - } - else if(val.is_map()) - { - Octave_map map=val.map_value(); - octave_value_list ov_list; - Cell c; - int i,length=map.nfields(),key_len=0; - string_vector key=map.keys(); - - for (i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i]; - ov_list(1)=octave_value(sock_m.row(i)); - type_id=7; - if(sock!=0){ - nl=htonl(type_id); - write(sock,&nl,sizeof(int)); - nl=htonl(length); - write(sock,&nl,sizeof(int)); - for(i=0;i<length;i++){ - key_len=key[i].length(); - nl=htonl(key_len); - write(sock,&nl,sizeof(int)); - c=map.contents(key[i]); - ov_list(0)=c(0); - write(sock,key[i].c_str(), key_len); - Fsend(ov_list,0); - } - } - } - } - else - error("unsupported type %d",type_id); } - else - print_usage (); - + return retval; - } - -/* -;;; Local Variables: *** -;;; mode: C++ *** -;;; End: *** -*/ Added: trunk/octave-forge/main/parallel/src/sock-stream.cc =================================================================== --- trunk/octave-forge/main/parallel/src/sock-stream.cc (rev 0) +++ trunk/octave-forge/main/parallel/src/sock-stream.cc 2010-08-13 13:27:35 UTC (rev 7526) @@ -0,0 +1,149 @@ +// Copyright (C) 2010 Olaf Till + +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +#include <octave/oct.h> + +#include <errno.h> + +#include <oct-stream.h> +#include <oct-prcstrm.h> + +static void handle_errno (const std::string msg, const int err_no) +{ + std::string err; + + switch (err_no) + { + case EINVAL: + err = "EINVAL"; + break; + case ENOMEM: + err = "ENOMEM"; + break; + case EACCES: + err = "EACCES"; + break; + case EAGAIN: + err = "EAGAIN"; + break; + case EEXIST: + err = "EEXIST"; + break; + case EFAULT: + err = "EFAULT"; + break; + case EFBIG: + err = "EFBIG"; + break; + case EINTR: + err = "EINTR"; + break; + case EISDIR: + err = "EISDIR"; + break; + case ELOOP: + err = "ELOOP"; + break; + case EMFILE: + err = "EMFILE"; + break; + case ENAMETOOLONG: + err = "ENAMETOOLONG"; + break; + case ENFILE: + err = "ENFILE"; + break; + case ENODEV: + err = "ENODEV"; + break; + case ENOENT: + err = "ENOENT"; + break; + case ENOSPC: + err = "ENOSPC"; + break; + case ENOTDIR: + err = "ENOTDIR"; + break; + case ENXIO: + err = "ENXIO"; + break; + case EPERM: + err = "EPERM"; + break; + case EROFS: + err = "EROFS"; + break; + case ETXTBSY: + err = "ETXTBSY"; + break; + case EBADF: + err = "EBADF"; + break; + case EDEADLK: + err = "EDEADLK"; + break; + case ENOLCK: + err = "ENOLCK"; + break; + default: + err = "unknown error"; + } + + error ("%s: %s\n", msg.c_str (), err.c_str ()); +} + +int socket_to_oct_iostream (int sd) +{ + errno = 0; + + FILE *fid = fdopen (sd, "rb+"); + + if (! fid) + { + handle_errno + ("could not get C stream from socket descriptor with 'fdopen'", + errno); + + return -1; + } + + oct_mach_info::float_format flt_fmt = + oct_mach_info::string_to_float_format ("ieee-le"); + + std::ios::openmode md = std::ios::in | std::ios::out; + + octave_stream os = + octave_stdiostream::create ("", fid, md, flt_fmt); + + if (! os) + { + error ("could not get Octave stream from C stream"); + + return -1; + } + + octave_stream_list::insert (os); + + return sd; // Octave assigns the same number as in 'sd' +} + + +/* +;;; Local Variables: *** +;;; mode: C++ *** +;;; End: *** +*/ Added: trunk/octave-forge/main/parallel/src/sock-stream.h =================================================================== --- trunk/octave-forge/main/parallel/src/sock-stream.h (rev 0) +++ trunk/octave-forge/main/parallel/src/sock-stream.h 2010-08-13 13:27:35 UTC (rev 7526) @@ -0,0 +1,25 @@ +// Copyright (C) 2010 Olaf Till + +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +int socket_to_oct_iostream (int sd); + + +/* +;;; Local Variables: *** +;;; mode: C++ *** +;;; End: *** +*/ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2010-08-20 09:15:47
|
Revision: 7557 http://octave.svn.sourceforge.net/octave/?rev=7557&view=rev Author: i7tiol Date: 2010-08-20 09:15:40 +0000 (Fri, 20 Aug 2010) Log Message: ----------- Slight cleanup of signal handling and exiting. Modified Paths: -------------- trunk/octave-forge/main/parallel/doc/README.parallel trunk/octave-forge/main/parallel/inst/select_sockets.m trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc trunk/octave-forge/main/parallel/src/reval.cc trunk/octave-forge/main/parallel/src/sock-stream.h Added Paths: ----------- trunk/octave-forge/main/parallel/inst/__pserver_exit__.m Modified: trunk/octave-forge/main/parallel/doc/README.parallel =================================================================== --- trunk/octave-forge/main/parallel/doc/README.parallel 2010-08-19 07:41:03 UTC (rev 7556) +++ trunk/octave-forge/main/parallel/doc/README.parallel 2010-08-20 09:15:40 UTC (rev 7557) @@ -15,6 +15,8 @@ 1. GNU Octave >= 3.0.0 that is available at http://www.octave.org/ 2. two or more computers + 3. identical versions of package "parallel" installed on these + computers Security note @@ -22,16 +24,18 @@ Commands are sent to slave machines over TCP connections to port 12502, data is sent between machines over TCP connections to port 12501. With the current version, it is your own responsibility to - secure these ports against unauthorized access. + secure these ports against unauthorized access. There is no + ecryption. How to use + Note first that this package assumes the use in a multiple-computer system consisting of a master (your console) computer and some slave computers. Run Octave with argument "server.m" on every slave - computer. + computer, e.g. (the actual path to server.m may differ): - $ octave /usr/local/share/octave/2.1.43/site/m/octave-forge/parallel/server.m + $ octave /usr/share/octave/packages/parallel-...version.../server.m Run Octave without argument "server.m" on the master computer @@ -89,12 +93,11 @@ reval ([ "a=[ 1:3 ]"; "a=a'*a" ],socket(2,:)); -sclose (sockets) +scloseall (sockets) Close the connections specified by the matrix "sockets". - Returns zero on success, or nonzero if an error occurred. For example, - sclose (sockets); + scloseall (sockets); select_sockets: [N, IDX] = select_sockets (SOCKETS, TIMEOUT[, NFDS]) See help-text of this function. Returns an index to rows in "sockets" @@ -112,14 +115,14 @@ clear; hosts = [ "host1"; "host2"; "host3" ]; -sockets = connect(hosts); -psum = zeros(1,2); -reval( "send(sum([1:50]),sockets(1,:))", sockets(2,:)); -reval( "send(sum([51:100]),sockets(1,:))", sockets(3,:)); -psum(1) = recv(sockets(2,:)); -psum(2) = recv(sockets(3,:)); -sum(psum) -scloseall(sockets); +sockets = connect (hosts); +psum = zeros (1, 2); +reval ("send (sum (1:50), sockets(1, :))", sockets(2, :)); +reval ("send (sum (51:100), sockets(1, :))", sockets(3, :)); +psum(1) = recv (sockets(2, :)); +psum(2) = recv (sockets(3, :)); +sum (psum) +scloseall (sockets); In the following script, the variable s="Hello, again!" @@ -128,20 +131,21 @@ clear; hosts = [ "host1"; "host2"; "host3"; "host4" ]; sockets = connect(hosts); -s="Hello, again!"; -send(s,sockets(2,:)); # for larger data than contained in "s", this command should be given _after_ the following "reval" commands -reval( "s=recv(sockets(1,:));",sockets(2,:)); -reval( "send(s,sockets(3,:));",sockets(2,:)); -reval( "s=recv(sockets(2,:));",sockets(3,:)); -reval( "send(s,sockets(4,:));",sockets(3,:)); -reval( "s=recv(sockets(3,:));",sockets(4,:)); -reval( "send(s,sockets(1,:));",sockets(4,:)); -s2=recv(sockets(4,:)) -scloseall(sockets); +s = "Hello, again!"; +send (s,sockets(2,:)); # for larger data than contained in "s", this command should be given _after_ the following "reval" commands +reval ("s = recv (sockets(1, :));", sockets(2, :)); +reval ("send (s, sockets(3, :));", sockets(2, :)); +reval ("s = recv (sockets(2, :));", sockets(3, :)); +reval ("send (s, sockets(4,:));", sockets(3, :)); +reval ("s = recv (sockets(3,:));", sockets(4, :)); +reval ("send (s, sockets(1, :));",sockets(4, :)); +s2 = recv (sockets(4,:)) +scloseall (sockets); License: + This package is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation. See individual files for GPL Version. @@ -151,4 +155,4 @@ Comments and suggestions should be directed to: h_f...@us... -or to the current package maintainer. \ No newline at end of file +or to the current package maintainer. Added: trunk/octave-forge/main/parallel/inst/__pserver_exit__.m =================================================================== --- trunk/octave-forge/main/parallel/inst/__pserver_exit__.m (rev 0) +++ trunk/octave-forge/main/parallel/inst/__pserver_exit__.m 2010-08-20 09:15:40 UTC (rev 7557) @@ -0,0 +1,31 @@ +## Copyright (C) 2010 Olaf Till <ola...@un...> +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, write to the Free Software +## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +function __pserver_exit__ (hn) + + ## Will be initialized and then registered as an exit function of + ## Octave, so pserver does not have to wrap Octaves clean_up_and_exit + ## and Octaves sigterm handler. + + persistent hostname; + + if (nargin == 1) + hostname = hn; + else + fprintf (stderr, "exiting, %s\n", hostname); + endif + +endfunction \ No newline at end of file Modified: trunk/octave-forge/main/parallel/inst/select_sockets.m =================================================================== --- trunk/octave-forge/main/parallel/inst/select_sockets.m 2010-08-19 07:41:03 UTC (rev 7556) +++ trunk/octave-forge/main/parallel/inst/select_sockets.m 2010-08-20 09:15:40 UTC (rev 7557) @@ -50,3 +50,5 @@ [n, ridx] = \ select (cat \ (2, {varargin{1}(:, 1)}, {[], []}, varargin(2:end)){:}); + +endfunction Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2010-08-19 07:41:03 UTC (rev 7556) +++ trunk/octave-forge/main/parallel/src/connect.cc 2010-08-20 09:15:40 UTC (rev 7557) @@ -20,31 +20,14 @@ // TODO: error handling is a mess #include <octave/oct.h> +#include <oct-env.h> -#include "defun-dld.h" -#include "dirfns.h" -#include "error.h" -#include "help.h" -#include "oct-map.h" -#include "systime.h" -#include "ov.h" -#include "oct-obj.h" -#include "utils.h" -#include "oct-env.h" - -#include <stdio.h> -#include <sys/types.h> #include <sys/socket.h> -#include <netinet/in.h> #include <errno.h> #include <netdb.h> -#include <unistd.h> -#include <netinet/in.h> #include "sock-stream.h" -#define BUFF_SIZE SSIZE_MAX - // COMM static void read_if_no_error (int fd, void *buf, size_t count, int est) { @@ -209,7 +192,7 @@ else { - int bufsize=262144; + int bufsize = BUFF_SIZE; socklen_t ol; ol=sizeof(bufsize); setsockopt(sock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol); Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-19 07:41:03 UTC (rev 7556) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-20 09:15:40 UTC (rev 7557) @@ -19,112 +19,43 @@ #include <octave/oct.h> -#include "defun-dld.h" -#include "dirfns.h" -#include "error.h" -#include "help.h" -#include "oct-map.h" -#include "systime.h" -#include "ov.h" -#include "oct-obj.h" -#include "utils.h" -#include "oct-env.h" -#include "file-io.h" -#include "sighandlers.h" -#include "parse.h" -#include "cmd-edit.h" -#include "variables.h" -#include "toplev.h" -#include "sysdep.h" -#include "oct-prcstrm.h" -#include "oct-stream.h" -#include "oct-strstrm.h" -#include "oct-iostrm.h" -#include "unwind-prot.h" -#include "input.h" -#include "quit.h" +#include <oct-env.h> +#include <file-io.h> +#include <sighandlers.h> +#include <parse.h> +#include <cmd-edit.h> +#include <toplev.h> -#include <iostream> -#include <stdio.h> -#include <sys/types.h> #include <sys/socket.h> +#include <iostream> #include <sys/stat.h> -#include <sys/wait.h> #include <sys/poll.h> -#include <netinet/in.h> #include <errno.h> -#include <signal.h> #include <netdb.h> -#include <unistd.h> -#include <setjmp.h> -#include <netinet/in.h> #include "sock-stream.h" -// SSIZE_MAX might be for 64-bit. Limit to 2^31-1 -#define BUFF_SIZE 2147483647 +/* children are not killed on parent exit; for that octave_child_list + can not be used and an own SIGCHLD handler is needed */ -// Handle server SIGTERM SIGQUIT - -static RETSIGTYPE -sigterm_handler (int /* sig */) +static +bool pserver_child_event_handler (pid_t pid, int ev) { - int len=118; - char hostname[120],pidname[128]; - gethostname(hostname,len); - sprintf(pidname,"/tmp/.octave-%s.pid",hostname); - remove (pidname); - close_files (); - - std::cerr << "exiting, " <<hostname <<std::endl; - cleanup_tmp_files (); - exit(0); - + return 1; // remove child from octave_child_list } -static RETSIGTYPE -sigchld_handler(int /* sig */) -{ - int status; - /* Reap all childrens */ - while (waitpid(-1, &status, WNOHANG) > 0) - ; - signal(SIGCHLD, sigchld_handler); -} - -int +void reval_loop (int sock) { - // Allow the user to interrupt us without exiting. + // The big loop. + int len=0; char *ev_str; std::string s; - octave_save_signal_mask (); - - if (octave_set_current_context) - { -#if defined (USE_EXCEPTIONS_FOR_INTERRUPTS) - panic_impossible (); -#else - unwind_protect::run_all (); - raw_mode (0); - std::cout << "\n"; - octave_restore_signal_mask (); -#endif - } - - can_interrupt = true; - - octave_catch_interrupts (); - - octave_initialized = true; - - // The big loop. - char dummy; read(sock,&dummy,sizeof(char)); - int retval,count,r_len,num,fin,nl; + int p_err,count,r_len,num,fin,nl; struct pollfd *pollfd; pollfd=(struct pollfd *)malloc(sizeof(struct pollfd)); @@ -132,7 +63,7 @@ pollfd[0].events=0; pollfd[0].events=POLLIN|POLLERR|POLLHUP; - do + while (true) // function does not return { pollfd[0].revents=0; num=poll(pollfd,1,-1); @@ -166,16 +97,23 @@ ev_str[len]='\0'; s=(std::string)ev_str; - eval_string(s,false,retval,0); + eval_string(s,false,p_err,0); delete(ev_str); + nl = 0; if (error_state) { - nl=htonl(error_state); - write(sock,&nl,sizeof(int)); - read(sock,&nl,sizeof(int)); - // clean_up_and_exit_server (retval); + nl = 1; + error_state = 0; } + else if (p_err) + nl = 1; + if (nl) + { + nl = htonl (nl); + write (sock, &nl, sizeof (int)); + read (sock, &nl, sizeof (int)); + } else { if (octave_completion_matches_called) @@ -183,12 +121,8 @@ else command_editor::increment_current_command_number (); } - // Blocking Execution - // write(sock,&error_state,sizeof(error_state)); } - while (retval == 0); - return retval; } DEFUN_DLD (pserver,,, @@ -208,22 +142,28 @@ clean_up_and_exit (1); } + // initialize exit function + feval ("__pserver_exit__", octave_value (hostname), 0); + if (fork()) clean_up_and_exit(0); - /* Touch lock file. */ + // register exit function + feval ("atexit", octave_value ("__pserver_exit__"), 0); + + /* Touch lock file, mark for deletion. */ ppid=getpid(); + mark_for_deletion (pidname); pidfile = fopen (pidname, "w"); fprintf(pidfile,"%d\n",ppid); fclose(pidfile); std::cout <<pidname<<std::endl; - /* */ - signal(SIGCHLD, sigchld_handler); - signal(SIGTERM,sigterm_handler); - signal(SIGQUIT,sigterm_handler); + // avoid dumping octave_core if killed by a signal + feval ("sigterm_dumps_octave_core", octave_value (0), 0); + feval ("sighup_dumps_octave_core", octave_value (0), 0); - /* Redirect stdin, stdout, and stderr to /dev/null. */ + /* Redirect stdin and stdout to /dev/null. */ freopen("/dev/null", "r", stdin); freopen("/dev/null", "w", stdout); @@ -277,7 +217,7 @@ dsock=socket(PF_INET,SOCK_STREAM,0); if(dsock==-1){ perror("socket : "); - exit(-1); + clean_up_and_exit(-1); } addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); @@ -289,11 +229,11 @@ if(bind(dsock,(struct sockaddr *) addr,sizeof(*addr))!=0){ perror("bind : "); - exit(-1); + clean_up_and_exit(-1); } if(listen(dsock,SOMAXCONN)!=0){ perror("listen : "); - exit(-1); + clean_up_and_exit(-1); } free(addr); int param=1; @@ -318,6 +258,18 @@ /* Normal production daemon. Fork, and have the child process the connection. The parent continues listening. */ + // remove non-existing children from octave_child_list + OCTAVE_QUIT; + + sigset_t nset, oset, dset; + + BLOCK_CHILD (nset, oset); + BLOCK_SIGNAL (SIGTERM, nset, dset); + BLOCK_SIGNAL (SIGHUP, nset, dset); + + // restores all signals to state before BLOCK_CHILD +#define RESTORE_SIGNALS(ovar) UNBLOCK_CHILD(ovar) + if((pid=fork())==-1) { perror("fork "); @@ -330,6 +282,8 @@ signal(SIGTERM,SIG_DFL); signal(SIGQUIT,SIG_DFL); + RESTORE_SIGNALS (oset); + val=1; ol=sizeof(val); setsockopt(asock,SOL_SOCKET,SO_REUSEADDR,&val,ol); @@ -365,7 +319,7 @@ dasock=accept(dsock,(sockaddr *)&rem_addr,(socklen_t *)&len); if(dasock==-1){ perror("accept dat "); - exit(-1); + _exit(-1); } int bufsize=BUFF_SIZE; socklen_t ol; @@ -438,7 +392,7 @@ dsock=socket(PF_INET,SOCK_STREAM,0); if(dsock==-1){ perror("socket : "); - exit(-1); + _exit(-1); } addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); @@ -455,7 +409,7 @@ break; }else if(errno!=ECONNREFUSED){ perror("connect : "); - exit(-1); + _exit(-1); }else { usleep(5000); } @@ -511,11 +465,6 @@ } free(host_list); - //normal act - install_signal_handlers (); - - atexit (do_octave_atexit); - char * s; int stat; @@ -545,21 +494,21 @@ if(cd_ok != true){ octave_env::chdir ("/tmp"); } - int retval = reval_loop(asock); - - if (retval == 1 && ! error_state) - retval = 0; - - close(asock); - clean_up_and_exit (retval); + reval_loop (asock); // does not return } + + // parent + + octave_child_list::insert (pid, pserver_child_event_handler); + + RESTORE_SIGNALS (oset); close(asock); } close(sock); - exit(-1); + clean_up_and_exit(-1); } Modified: trunk/octave-forge/main/parallel/src/reval.cc =================================================================== --- trunk/octave-forge/main/parallel/src/reval.cc 2010-08-19 07:41:03 UTC (rev 7556) +++ trunk/octave-forge/main/parallel/src/reval.cc 2010-08-20 09:15:40 UTC (rev 7557) @@ -105,6 +105,9 @@ } } + if (error_state) + return retval; + for(i=0;i<nsock;i++){ sock=(int)sock_m.data()[i+nsock]; if(sock!=0){ Modified: trunk/octave-forge/main/parallel/src/sock-stream.h =================================================================== --- trunk/octave-forge/main/parallel/src/sock-stream.h 2010-08-19 07:41:03 UTC (rev 7556) +++ trunk/octave-forge/main/parallel/src/sock-stream.h 2010-08-20 09:15:40 UTC (rev 7557) @@ -14,9 +14,18 @@ // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +#if !defined (sock_stream_h) +#define sock_stream_h +// (this comment and the definition have been taken from pserver.cc +// and were probably from Hayato Fujiwara) +// +// SSIZE_MAX might be for 64-bit. Limit to 2^31-1 +#define BUFF_SIZE 2147483647 + int socket_to_oct_iostream (int sd); +#endif /* ;;; Local Variables: *** This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2010-08-24 08:14:01
|
Revision: 7563 http://octave.svn.sourceforge.net/octave/?rev=7563&view=rev Author: i7tiol Date: 2010-08-24 08:13:53 +0000 (Tue, 24 Aug 2010) Log Message: ----------- Use return value of read/write everywhere and a bit more sensible. Remove never fullfilled condition for handling of remote errors in reval.cc. Modified Paths: -------------- trunk/octave-forge/main/parallel/inst/scloseall.m trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc trunk/octave-forge/main/parallel/src/reval.cc trunk/octave-forge/main/parallel/src/sclose.cc Removed Paths: ------------- trunk/octave-forge/main/parallel/src/swab.h Modified: trunk/octave-forge/main/parallel/inst/scloseall.m =================================================================== --- trunk/octave-forge/main/parallel/inst/scloseall.m 2010-08-23 21:02:33 UTC (rev 7562) +++ trunk/octave-forge/main/parallel/inst/scloseall.m 2010-08-24 08:13:53 UTC (rev 7563) @@ -17,7 +17,7 @@ ## scloseall (sockets) - reval("sclose(sockets);exit;",sockets); - sclose(sockets); + reval ("sclose (sockets); __exit__;", sockets); + sclose (sockets); endfunction Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2010-08-23 21:02:33 UTC (rev 7562) +++ trunk/octave-forge/main/parallel/src/connect.cc 2010-08-24 08:13:53 UTC (rev 7563) @@ -30,13 +30,17 @@ // COMM -static void read_if_no_error (int fd, void *buf, size_t count, int est) { +static +void read_if_no_error (int fd, void *buf, size_t count, int est) +{ if (! est) if (read (fd, buf, count) < (ssize_t)count) error ("read error"); } -static void write_if_no_error (int fd, const void *buf, size_t count, int est) { +static +void write_if_no_error (int fd, const void *buf, size_t count, int est) +{ if (! est) if (write (fd, buf, count) < (ssize_t)count) error ("write error"); @@ -145,9 +149,13 @@ write_if_no_error(sock,&nl,sizeof(int),error_state); write_if_no_error(sock,directory.c_str(),comm_len,error_state); } - } - usleep(100); + if (error_state) + return retval; + } + + usleep(100); // why? + for(i=1;i<row;i++){ host=(char *)calloc(1,col+1); @@ -253,6 +261,9 @@ write_if_no_error((int)sock_v[i+row],&lf,sizeof(char),error_state); // cout << i+row <<endl; } + + if (error_state) + return retval; } else { Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-23 21:02:33 UTC (rev 7562) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-24 08:13:53 UTC (rev 7563) @@ -39,6 +39,26 @@ can not be used and an own SIGCHLD handler is needed */ static +void read_or_exit (int fd, void *buf, size_t count) +{ + if (read (fd, buf, count) < (ssize_t)count) + { + error ("read error"); + _exit (1); + } +} + +static +void write_or_exit (int fd, const void *buf, size_t count) +{ + if (write (fd, buf, count) < (ssize_t)count) + { + error ("write error"); + _exit (1); + } +} + +static bool pserver_child_event_handler (pid_t pid, int ev) { return 1; // remove child from octave_child_list @@ -60,8 +80,7 @@ pollfd=(struct pollfd *)malloc(sizeof(struct pollfd)); pollfd[0].fd=sock; - pollfd[0].events=0; - pollfd[0].events=POLLIN|POLLERR|POLLHUP; + pollfd[0].events=POLLIN; while (true) // function does not return { @@ -70,28 +89,35 @@ if(num){ if(pollfd[0].revents && (pollfd[0].fd !=0)){ if(pollfd[0].revents&POLLIN){ - fin=read(sock,&nl,sizeof(int)); + read_or_exit(sock,&nl,sizeof(int)); len=ntohl(nl); - if(!fin) - clean_up_and_exit (0); } if(pollfd[0].revents&POLLERR){ std::cerr <<"Error condition "<<std::endl; - clean_up_and_exit (POLLERR); + _exit (POLLERR); } if(pollfd[0].revents&POLLHUP){ std::cerr <<"Hung up "<<std::endl; - clean_up_and_exit (POLLHUP); + _exit (POLLHUP); } + if(pollfd[0].revents&POLLNVAL){ + std::cerr <<"fd not open "<<std::endl; + _exit (POLLNVAL); + } } } ev_str=new char[len+1]; count=0; r_len=BUFF_SIZE; - while(count <len){ + while(count < len){ if((len-count) < BUFF_SIZE) r_len=len-count; - count +=read(sock,(ev_str+count),r_len); + count += (fin = read (sock, (ev_str + count), r_len)); + if (fin <= 0) + { + error ("read error"); + _exit (1); + } } // read(sock,ev_str,len); ev_str[len]='\0'; @@ -111,8 +137,8 @@ if (nl) { nl = htonl (nl); - write (sock, &nl, sizeof (int)); - read (sock, &nl, sizeof (int)); + write_or_exit (sock, &nl, sizeof (int)); + read_or_exit (sock, &nl, sizeof (int)); } else { @@ -288,19 +314,19 @@ ol=sizeof(val); setsockopt(asock,SOL_SOCKET,SO_REUSEADDR,&val,ol); - read(asock,&nl,sizeof(int)); + read_or_exit(asock,&nl,sizeof(int)); num_nodes=ntohl(nl); - read(asock,&nl,sizeof(int)); + read_or_exit(asock,&nl,sizeof(int)); me=ntohl(nl); - read(asock,&nl,sizeof(int)); + read_or_exit(asock,&nl,sizeof(int)); pppid=ntohl(nl); sock_v=(int *)calloc((num_nodes+1)*3,sizeof(int)); host_list=(char **)calloc(num_nodes+1,sizeof(char *)); for(i=0;i<=num_nodes;i++){ - read(asock,&nl,sizeof(int)); + read_or_exit(asock,&nl,sizeof(int)); len=ntohl(nl); host_list[i]=(char *)calloc(len,sizeof(char)); - read(asock,host_list[i],len); + read_or_exit(asock,host_list[i],len); } sprintf(errname,"/tmp/octave_error-%s_%05d.log",hostname,pppid); @@ -331,13 +357,13 @@ setsockopt(dasock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); //recv pppid (of connecting process at master) - read(dasock,&nl,sizeof(int)); + read_or_exit(dasock,&nl,sizeof(int)); rpppid=ntohl(nl); //recv name size - read(dasock,&nl,sizeof(int)); + read_or_exit(dasock,&nl,sizeof(int)); len=ntohl(nl); //recv name - read(dasock,rem_name,len+1); + read_or_exit(dasock,rem_name,len+1); rem_name[len]='\0'; for(j=0;j<me;j++){ @@ -353,7 +379,7 @@ if(result==0){ if(pppid==rpppid){ nl=htonl(result); - write(dasock,&nl,sizeof(int)); + write_or_exit(dasock,&nl,sizeof(int)); //send endian #if defined (__BYTE_ORDER) nl=htonl(__BYTE_ORDER); @@ -362,9 +388,9 @@ #else # error "can not determine the byte order" #endif - write(dasock,&nl,sizeof(int)); + write_or_exit(dasock,&nl,sizeof(int)); //recv endian - read(dasock,&nl,sizeof(int)); + read_or_exit(dasock,&nl,sizeof(int)); sock_v[j+2*(num_nodes+1)]=ntohl(nl); socket_to_oct_iostream (dasock); break; @@ -374,7 +400,7 @@ }else{ result=-1; nl=htonl(result); - write(dasock,&nl,sizeof(int)); + write_or_exit(dasock,&nl,sizeof(int)); close(dasock); sleep(1); } @@ -425,21 +451,21 @@ //send pppid nl=htonl(pppid); - write(dsock,&nl,sizeof(int)); + write_or_exit(dsock,&nl,sizeof(int)); //send name size len=strlen(host_list[me]); nl=htonl(len); - write(dsock,&nl,sizeof(int)); + write_or_exit(dsock,&nl,sizeof(int)); //send name - write(dsock,host_list[me],len+1); + write_or_exit(dsock,host_list[me],len+1); //recv result code - read(dsock,&nl,sizeof(int)); + read_or_exit(dsock,&nl,sizeof(int)); result=ntohl(nl); if(result==0){ sock_v[i]=dsock; //recv endian - read(dsock,&nl,sizeof(int)); + read_or_exit(dsock,&nl,sizeof(int)); sock_v[i+2*(num_nodes+1)]=ntohl(nl); //send endian #if defined (__BYTE_ORDER) @@ -449,7 +475,7 @@ #else # error "can not determine the byte order" #endif - write(dsock,&nl,sizeof(int)); + write_or_exit(dsock,&nl,sizeof(int)); socket_to_oct_iostream (dsock); break; }else{ @@ -486,10 +512,10 @@ char *newdir; int newdir_len; - read(asock,&nl,sizeof(int)); + read_or_exit(asock,&nl,sizeof(int)); newdir_len=ntohl(nl); newdir=(char *)calloc(sizeof(char),newdir_len+1); - read(asock,newdir,newdir_len); + read_or_exit(asock,newdir,newdir_len); int cd_ok=octave_env::chdir (newdir); if(cd_ok != true){ octave_env::chdir ("/tmp"); Modified: trunk/octave-forge/main/parallel/src/reval.cc =================================================================== --- trunk/octave-forge/main/parallel/src/reval.cc 2010-08-23 21:02:33 UTC (rev 7562) +++ trunk/octave-forge/main/parallel/src/reval.cc 2010-08-24 08:13:53 UTC (rev 7563) @@ -45,6 +45,22 @@ // COMM +static +void read_if_no_error (int fd, void *buf, size_t count, int est) +{ + if (! est) + if (read (fd, buf, count) < (ssize_t)count) + error ("read error"); +} + +static +void write_if_no_error (int fd, const void *buf, size_t count, int est) +{ + if (! est) + if (write (fd, buf, count) < (ssize_t)count) + error ("write error"); +} + DEFUN_DLD (reval, args, , "reval (commands,sockets)\n\ \n\ @@ -54,7 +70,7 @@ if(args.length () ==2) { - int sock,row=0,col=0,nsock=0,i,j,k; + int sock,row=0,col=0,nsock=0,i,j,k, fin; int error_code,count=0,r_len=0,nl; octave_value val=args(0); Matrix sock_m=args(1).matrix_value(); @@ -66,41 +82,52 @@ row=val.rows(); col=val.columns(); - if(sock_m.data()[0]==0){ - int num,pid; - struct pollfd *pollfd; - pollfd=(struct pollfd *)malloc(nsock*sizeof(struct pollfd)); - for(i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i+nsock]; - pollfd[i].fd=sock; - pollfd[i].events=0; - pollfd[i].events=POLLIN|POLLERR|POLLHUP; - } - - num=poll(pollfd,nsock,0); - if(num){ - for(k=0;k<nsock;k++){ - if(pollfd[k].revents && (pollfd[k].fd !=0)){ - sockaddr_in r_addr; - struct hostent *hehe; - socklen_t len = sizeof(r_addr); - getpeername(pollfd[k].fd, (sockaddr*)&r_addr, &len ); - hehe=gethostbyaddr((char *)&r_addr.sin_addr.s_addr,sizeof(r_addr.sin_addr), AF_INET); - - if(pollfd[k].revents&POLLIN){ - pid=getpid(); - read(pollfd[k].fd,&nl,sizeof(int)); - error_code=ntohl(nl); - write(pollfd[k].fd,&nl,sizeof(int)); - error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); - } - if(pollfd[k].revents&POLLERR){ - error("Error condition - %s",hehe->h_name ); - } - if(pollfd[k].revents&POLLHUP){ - error("Hung up - %s",hehe->h_name ); - } + int num,pid; + struct pollfd *pollfd; + pollfd=(struct pollfd *)malloc(nsock*sizeof(struct pollfd)); + for(i=0;i<nsock;i++){ + sock=(int)sock_m.data()[i+nsock]; + pollfd[i].fd=sock; + pollfd[i].events = POLLIN; + } + + num=poll(pollfd,nsock,0); + if(num){ + for(k=0;k<nsock;k++){ + if(pollfd[k].revents && (pollfd[k].fd !=0)){ + sockaddr_in r_addr; + struct hostent *hehe; + socklen_t len = sizeof(r_addr); + getpeername(pollfd[k].fd, (sockaddr*)&r_addr, &len ); + hehe=gethostbyaddr((char *)&r_addr.sin_addr.s_addr,sizeof(r_addr.sin_addr), AF_INET); + + if(pollfd[k].revents&POLLIN){ + pid=getpid(); + if (read (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) + { + error ("read error"); + return retval; + } + error_code=ntohl(nl); + if (write (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) + { + error ("write error"); + return retval; + } + error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); } + if(pollfd[k].revents&POLLERR){ + error("Error condition - %s",hehe->h_name ); + return retval; + } + if(pollfd[k].revents&POLLHUP){ + error("Hung up - %s",hehe->h_name ); + return retval; + } + if(pollfd[k].revents & POLLNVAL){ + error ("fd not open - %s", hehe->h_name); + return retval; + } } } } @@ -116,13 +143,22 @@ comm[col]='\n'; comm[col+1]='\0'; nl=htonl(col); - write(sock,&nl,sizeof(int)); + if (write(sock,&nl,sizeof(int)) < sizeof (int)) + { + error ("write error"); + return retval; + } count=0; r_len=BUFF_SIZE; while(count <col){ if((col-count) < BUFF_SIZE) r_len=col-count; - count +=write(sock,(comm+count),r_len); + count += (fin = write (sock, (comm + count), r_len)); + if (fin <= 0) + { + error ("write error"); + return retval; + } } // Blocking Execution Modified: trunk/octave-forge/main/parallel/src/sclose.cc =================================================================== --- trunk/octave-forge/main/parallel/src/sclose.cc 2010-08-23 21:02:33 UTC (rev 7562) +++ trunk/octave-forge/main/parallel/src/sclose.cc 2010-08-24 08:13:53 UTC (rev 7563) @@ -66,8 +66,7 @@ for(i=0;i<nsock;i++){ sock=(int)args(0).matrix_value().data()[i+nsock]; pollfd[i].fd=sock; - pollfd[i].events=0; - pollfd[i].events=POLLIN|POLLERR|POLLHUP; + pollfd[i].events=POLLIN; } num=poll(pollfd,nsock,0); @@ -96,6 +95,9 @@ if(pollfd[k].revents&POLLHUP){ error("Hung up - %s",hehe->h_name ); } + if(pollfd[k].revents&POLLNVAL){ + error("fd not open - %s",hehe->h_name ); + } } } } Deleted: trunk/octave-forge/main/parallel/src/swab.h =================================================================== --- trunk/octave-forge/main/parallel/src/swab.h 2010-08-23 21:02:33 UTC (rev 7562) +++ trunk/octave-forge/main/parallel/src/swab.h 2010-08-24 08:13:53 UTC (rev 7563) @@ -1,23 +0,0 @@ -#define swab32(x) \ -({ \ - u_int32_t *__x = (u_int32_t *)(x); \ - ((u_int32_t)( \ - (((u_int32_t)(*__x) & (u_int32_t)0x000000ffUL) << 24) | \ - (((u_int32_t)(*__x) & (u_int32_t)0x0000ff00UL) << 8) | \ - (((u_int32_t)(*__x) & (u_int32_t)0x00ff0000UL) >> 8) | \ - (((u_int32_t)(*__x) & (u_int32_t)0xff000000UL) >> 24) )); \ -}) - -#define swab64(x) \ -({ \ - u_int64_t *__x = (u_int64_t *)(x); \ - ((u_int64_t)( \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x00000000000000ffULL) << 56) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x000000000000ff00ULL) << 40) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x0000000000ff0000ULL) << 24) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x00000000ff000000ULL) << 8) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x000000ff00000000ULL) >> 8) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x0000ff0000000000ULL) >> 24) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0x00ff000000000000ULL) >> 40) | \ - (u_int64_t)(((u_int64_t)(*__x) & (u_int64_t)0xff00000000000000ULL) >> 56) )); \ -}) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2010-08-24 10:34:42
|
Revision: 7565 http://octave.svn.sourceforge.net/octave/?rev=7565&view=rev Author: i7tiol Date: 2010-08-24 10:34:35 +0000 (Tue, 24 Aug 2010) Log Message: ----------- Remove communication of byte order. Handling of remote errors in send/recv.cc. Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc trunk/octave-forge/main/parallel/src/recv.cc trunk/octave-forge/main/parallel/src/reval.cc trunk/octave-forge/main/parallel/src/send.cc Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2010-08-24 10:29:06 UTC (rev 7564) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2010-08-24 10:34:35 UTC (rev 7565) @@ -1,6 +1,6 @@ Name: Parallel -Version: 2.0.3 -Date: 2010-08-13 +Version: 2.0.4 +Date: 2010-08-24 Author: Hayato Fujiwara and Olaf Till <ola...@un...> Maintainer: Olaf Till <ola...@un...> Title: Parallel Computing. Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2010-08-24 10:29:06 UTC (rev 7564) +++ trunk/octave-forge/main/parallel/src/connect.cc 2010-08-24 10:34:35 UTC (rev 7565) @@ -151,7 +151,10 @@ } if (error_state) - return retval; + { + free (sock_v); + return retval; + } } usleep(100); // why? @@ -230,20 +233,8 @@ result=ntohl(nl); if(result==0){ sock_v[i]=sock; - //recv endian - read_if_no_error(sock,&nl,sizeof(int),error_state); - sock_v[i+2*row]=ntohl(nl); - //send endian -#if defined (__BYTE_ORDER) - nl=htonl(__BYTE_ORDER); -#elif defined (BYTE_ORDER) - nl=htonl(BYTE_ORDER); -#else -# error "can not determine the byte order" -#endif - write_if_no_error(sock,&nl,sizeof(int),error_state); + sock_v[i + 2 * row] = 1; // means "I'm the master" socket_to_oct_iostream (sock); - }else{ close(sock); } @@ -253,7 +244,10 @@ free(host); if (error_state) - return retval; + { + free (sock_v); + return retval; + } } char lf='\n'; @@ -263,21 +257,26 @@ } if (error_state) - return retval; + { + free (sock_v); + return retval; + } + + Matrix mx(row,3); + double *tmp =mx.fortran_vec(); + for (i=0;i<3*row;i++) + tmp[i]=sock_v[i]; + retval = octave_value (mx); + + free (sock_v); + + return retval; } else { print_usage (); return retval; } - - Matrix mx(row,3); - double *tmp =mx.fortran_vec(); - for (i=0;i<3*row;i++) - tmp[i]=sock_v[i]; - retval = octave_value (mx); - - return retval; } Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-24 10:29:06 UTC (rev 7564) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2010-08-24 10:34:35 UTC (rev 7565) @@ -380,18 +380,6 @@ if(pppid==rpppid){ nl=htonl(result); write_or_exit(dasock,&nl,sizeof(int)); - //send endian -#if defined (__BYTE_ORDER) - nl=htonl(__BYTE_ORDER); -#elif defined (BYTE_ORDER) - nl=htonl(BYTE_ORDER); -#else -# error "can not determine the byte order" -#endif - write_or_exit(dasock,&nl,sizeof(int)); - //recv endian - read_or_exit(dasock,&nl,sizeof(int)); - sock_v[j+2*(num_nodes+1)]=ntohl(nl); socket_to_oct_iostream (dasock); break; } // And else? Shouldn't this test have been made @@ -464,18 +452,6 @@ if(result==0){ sock_v[i]=dsock; - //recv endian - read_or_exit(dsock,&nl,sizeof(int)); - sock_v[i+2*(num_nodes+1)]=ntohl(nl); - //send endian -#if defined (__BYTE_ORDER) - nl=htonl(__BYTE_ORDER); -#elif defined (BYTE_ORDER) - nl=htonl(BYTE_ORDER); -#else -# error "can not determine the byte order" -#endif - write_or_exit(dsock,&nl,sizeof(int)); socket_to_oct_iostream (dsock); break; }else{ Modified: trunk/octave-forge/main/parallel/src/recv.cc =================================================================== --- trunk/octave-forge/main/parallel/src/recv.cc 2010-08-24 10:29:06 UTC (rev 7564) +++ trunk/octave-forge/main/parallel/src/recv.cc 2010-08-24 10:34:35 UTC (rev 7565) @@ -1,3 +1,5 @@ +// Copyright (C) 2002 Hayato Fujiwara + // Copyright (C) 2010 Olaf Till <ola...@un...> // This program is free software; you can redistribute it and/or modify @@ -20,6 +22,12 @@ #include <octave/oct-stream.h> #include <octave/oct-map.h> +#include <sys/socket.h> +#include <sys/poll.h> +#include <netinet/in.h> +#include <netdb.h> + + DEFUN_DLD (recv, args, nargout, "recv (socket)\n\ \n\ Receive a variable from the computer specified by the row vector 'socket'.\n") @@ -37,6 +45,67 @@ if (error_state) return retval; + if ((int) socket.data ()[2]) // I'm the master + { + // This is code from original send.cc by Hayato Fujiwara + + int num, pid, sock, nl, error_code; + struct pollfd spollfd; + + sock = (int) socket.data ()[1]; + spollfd.fd = sock; + spollfd.events = POLLIN; + + num = poll (&spollfd, 1, 0); + if (num) + { + if (spollfd.revents && (spollfd.fd !=0)) + { + sockaddr_in r_addr; + struct hostent *hehe; + socklen_t len = sizeof (r_addr); + getpeername (spollfd.fd, (sockaddr*)&r_addr, &len); + hehe = gethostbyaddr ((char *)&r_addr.sin_addr.s_addr, + sizeof(r_addr.sin_addr), AF_INET); + + if (spollfd.revents & POLLIN) + { + pid = getpid (); + if (read (spollfd.fd, &nl, sizeof (int)) < + sizeof (int)) + { + error ("read error"); + } + error_code = ntohl (nl); + if (write (spollfd.fd, &nl, sizeof (int)) < + sizeof (int)) + { + error ("write error"); + } + error ("error occurred in %s\n\tsee " + "%s:/tmp/octave_error-%s_%5d.log for detail", + hehe->h_name, hehe->h_name, hehe->h_name, pid); + } + if (spollfd.revents & POLLERR) + { + error ("Error condition - %s", hehe->h_name); + } + if (spollfd.revents & POLLHUP) + { + error("Hung up - %s", hehe->h_name); + } + if (spollfd.revents & POLLNVAL) + { + error("fd not open - %s", hehe->h_name); + } + } + } + + if (error_state) + return retval; + } + + octave_stream is = octave_stream_list::lookup (octave_value (socket(0, 0)), "recv"); Modified: trunk/octave-forge/main/parallel/src/reval.cc =================================================================== --- trunk/octave-forge/main/parallel/src/reval.cc 2010-08-24 10:29:06 UTC (rev 7564) +++ trunk/octave-forge/main/parallel/src/reval.cc 2010-08-24 10:34:35 UTC (rev 7565) @@ -106,32 +106,34 @@ if (read (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) { error ("read error"); - return retval; + break; } error_code=ntohl(nl); if (write (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) { error ("write error"); - return retval; + break; } error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); } if(pollfd[k].revents&POLLERR){ error("Error condition - %s",hehe->h_name ); - return retval; + break; } if(pollfd[k].revents&POLLHUP){ error("Hung up - %s",hehe->h_name ); - return retval; + break; } if(pollfd[k].revents & POLLNVAL){ error ("fd not open - %s", hehe->h_name); - return retval; + break; } } } } + free (pollfd); + if (error_state) return retval; @@ -175,7 +177,6 @@ print_usage (); return retval; - } /* Modified: trunk/octave-forge/main/parallel/src/send.cc =================================================================== --- trunk/octave-forge/main/parallel/src/send.cc 2010-08-24 10:29:06 UTC (rev 7564) +++ trunk/octave-forge/main/parallel/src/send.cc 2010-08-24 10:34:35 UTC (rev 7565) @@ -1,3 +1,5 @@ +// Copyright (C) 2002 Hayato Fujiwara + // Copyright (C) 2010 Olaf Till <ola...@un...> // This program is free software; you can redistribute it and/or modify @@ -19,6 +21,12 @@ #include <octave/ls-oct-binary.h> #include <octave/oct-stream.h> +#include <sys/socket.h> +#include <sys/poll.h> +#include <netinet/in.h> +#include <netdb.h> + + DEFUN_DLD (send, args, , "send (X, sockets)\n\ \n\ Send the variable 'X' to the computers specified by matrix 'sockets'\n.") @@ -39,6 +47,79 @@ int rows = sockets.rows (); + if ((int) sockets.data ()[2 * rows]) // I'm the master + { + // This is code from original send.cc by Hayato Fujiwara + + int num, pid, sock, nl, error_code; + struct pollfd *pollfd; + pollfd = (struct pollfd *) malloc (rows * sizeof (struct pollfd)); + for(int i = 0; i < rows; i++) + { + sock = (int) sockets.data ()[i + rows]; + pollfd[i].fd = sock; + pollfd[i].events = POLLIN; + } + + num = poll (pollfd, rows, 0); + if (num) + { + for (int k = 0; k < rows; k++) + { + if (pollfd[k].revents && (pollfd[k].fd !=0)) + { + sockaddr_in r_addr; + struct hostent *hehe; + socklen_t len = sizeof (r_addr); + getpeername (pollfd[k].fd, (sockaddr*)&r_addr, &len); + hehe = gethostbyaddr ((char *)&r_addr.sin_addr.s_addr, + sizeof(r_addr.sin_addr), AF_INET); + + if (pollfd[k].revents & POLLIN) + { + pid = getpid (); + if (read (pollfd[k].fd, &nl, sizeof (int)) < + sizeof (int)) + { + error ("read error"); + break; + } + error_code = ntohl (nl); + if (write (pollfd[k].fd, &nl, sizeof (int)) < + sizeof (int)) + { + error ("write error"); + break; + } + error ("error occurred in %s\n\tsee " + "%s:/tmp/octave_error-%s_%5d.log for detail", + hehe->h_name, hehe->h_name, hehe->h_name, pid); + } + if (pollfd[k].revents & POLLERR) + { + error ("Error condition - %s", hehe->h_name); + break; + } + if (pollfd[k].revents & POLLHUP) + { + error("Hung up - %s", hehe->h_name); + break; + } + if (pollfd[k].revents & POLLNVAL) + { + error("fd not open - %s", hehe->h_name); + break; + } + } + } + } + + free (pollfd); + + if (error_state) + return retval; + } + double sid; for (int id = 0; id < rows; id++) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2011-01-25 11:20:46
|
Revision: 8072 http://octave.svn.sourceforge.net/octave/?rev=8072&view=rev Author: i7tiol Date: 2011-01-25 11:20:39 +0000 (Tue, 25 Jan 2011) Log Message: ----------- Cleanup warnings. Use passed mkoctfile version. Compatible with Octave > 3.2. Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc trunk/octave-forge/main/parallel/src/reval.cc trunk/octave-forge/main/parallel/src/sclose.cc Added Paths: ----------- trunk/octave-forge/main/parallel/src/Makefile.in trunk/octave-forge/main/parallel/src/autogen.sh trunk/octave-forge/main/parallel/src/config.h.in trunk/octave-forge/main/parallel/src/configure.ac Removed Paths: ------------- trunk/octave-forge/main/parallel/src/Makefile Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2011-01-24 19:22:23 UTC (rev 8071) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2011-01-25 11:20:39 UTC (rev 8072) @@ -1,6 +1,6 @@ Name: Parallel -Version: 2.0.4 -Date: 2010-08-24 +Version: 2.0.5 +Date: 2011-01-25 Author: Hayato Fujiwara and Olaf Till <ola...@un...> Maintainer: Olaf Till <ola...@un...> Title: Parallel Computing. Deleted: trunk/octave-forge/main/parallel/src/Makefile =================================================================== --- trunk/octave-forge/main/parallel/src/Makefile 2011-01-24 19:22:23 UTC (rev 8071) +++ trunk/octave-forge/main/parallel/src/Makefile 2011-01-25 11:20:39 UTC (rev 8072) @@ -1,14 +0,0 @@ -OCTS = sclose.oct connect.oct pserver.oct \ - recv.oct reval.oct send.oct \ - __bw_is_locked__.oct __bw_lock_file__.oct \ - __bw_unlock_file__.oct \ - __bw_prcv__.oct __bw_psend__.oct __internal_exit__.oct \ - select.oct - - -all: $(OCTS) - -%.oct: %.cc - mkoctfile -s $< sock-stream.cc - -clean: ; rm *.o *.oct Added: trunk/octave-forge/main/parallel/src/Makefile.in =================================================================== --- trunk/octave-forge/main/parallel/src/Makefile.in (rev 0) +++ trunk/octave-forge/main/parallel/src/Makefile.in 2011-01-25 11:20:39 UTC (rev 8072) @@ -0,0 +1,14 @@ +OCTS = sclose.oct connect.oct pserver.oct \ + recv.oct reval.oct send.oct \ + __bw_is_locked__.oct __bw_lock_file__.oct \ + __bw_unlock_file__.oct \ + __bw_prcv__.oct __bw_psend__.oct __internal_exit__.oct \ + select.oct + + +all: $(OCTS) + +%.oct: %.cc + @MKOCTFILE@ -s $< sock-stream.cc + +clean: ; rm *.o *.oct Added: trunk/octave-forge/main/parallel/src/autogen.sh =================================================================== --- trunk/octave-forge/main/parallel/src/autogen.sh (rev 0) +++ trunk/octave-forge/main/parallel/src/autogen.sh 2011-01-25 11:20:39 UTC (rev 8072) @@ -0,0 +1,3 @@ +#! /bin/sh + +autoconf Property changes on: trunk/octave-forge/main/parallel/src/autogen.sh ___________________________________________________________________ Added: svn:executable + * Added: trunk/octave-forge/main/parallel/src/config.h.in =================================================================== --- trunk/octave-forge/main/parallel/src/config.h.in (rev 0) +++ trunk/octave-forge/main/parallel/src/config.h.in 2011-01-25 11:20:39 UTC (rev 8072) @@ -0,0 +1,2 @@ +/* Define as 1 if Octave version is <= 3.2.4. */ +#undef OCTAVE_LE_3_2_4 Added: trunk/octave-forge/main/parallel/src/configure.ac =================================================================== --- trunk/octave-forge/main/parallel/src/configure.ac (rev 0) +++ trunk/octave-forge/main/parallel/src/configure.ac 2011-01-25 11:20:39 UTC (rev 8072) @@ -0,0 +1,43 @@ +# -*- Autoconf -*- +# Process this file with autoconf to produce a configure script. + +AC_PREREQ(2.62) +AC_INIT(parallel, 2.0.5, ola...@un...) +AC_CONFIG_SRCDIR([__internal_exit__.cc]) +AC_CONFIG_HEADERS([config.h]) + +# Checks for programs. +AC_PROG_CXX +AC_PROG_CC +AC_CHECK_PROG(MKOCTFILE, mkoctfile, mkoctfile) +AC_CHECK_PROG(OCTAVE, octave, octave) + +# Checks for libraries. + +# Checks for header files. +AC_CHECK_HEADERS([arpa/inet.h fcntl.h netdb.h netinet/in.h sys/socket.h sys/time.h unistd.h]) + +# Checks for typedefs, structures, and compiler characteristics. +AC_HEADER_STDBOOL +AC_TYPE_PID_T +AC_TYPE_SIZE_T +AC_TYPE_SSIZE_T + +# Checks for library functions. +AC_FUNC_ERROR_AT_LINE +AC_FUNC_FORK +AC_FUNC_MALLOC +AC_TYPE_SIGNAL +AC_CHECK_FUNCS([getcwd gethostbyaddr gethostbyname gethostname modf select socket strchr]) + +# Checks for Octave features. +AC_MSG_CHECKING([whether Octave version is <= 3.2.4]) +if $OCTAVE -q --eval "if (compare_versions (version (), \"3.2.4\", \"<=\")) exit (1); endif"; then + AC_MSG_RESULT(no) +else + AC_MSG_RESULT(yes) + AC_DEFINE(OCTAVE_LE_3_2_4, 1, [Define to Octave version <= 3.2.4.]) +fi + +AC_CONFIG_FILES([Makefile]) +AC_OUTPUT Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2011-01-24 19:22:23 UTC (rev 8071) +++ trunk/octave-forge/main/parallel/src/connect.cc 2011-01-25 11:20:39 UTC (rev 8072) @@ -19,6 +19,8 @@ // TODO: error handling is a mess +#include "config.h" + #include <octave/oct.h> #include <oct-env.h> @@ -144,7 +146,11 @@ } free(host); int comm_len; +#ifdef OCTAVE_LE_3_2_4 std::string directory = octave_env::getcwd (); +#else + std::string directory = octave_env::get_current_directory (); +#endif comm_len=directory.length(); nl=htonl(comm_len); write_if_no_error(sock,&nl,sizeof(int),error_state); Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2011-01-24 19:22:23 UTC (rev 8071) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2011-01-25 11:20:39 UTC (rev 8072) @@ -36,6 +36,38 @@ #include "sock-stream.h" +#include "config.h" + +#ifndef OCTAVE_LE_3_2_4 + +// Octave > 3.2.4 does not have these in a header file, but in +// sighandlers.cc, and uses gnulib:: for these. So this is copied from +// Octave-3.2.4. +#define BLOCK_SIGNAL(sig, nvar, ovar) \ + do \ + { \ + sigemptyset (&nvar); \ + sigaddset (&nvar, sig); \ + sigemptyset (&ovar); \ + sigprocmask (SIG_BLOCK, &nvar, &ovar); \ + } \ + while (0) + +#if !defined (SIGCHLD) && defined (SIGCLD) +#define SIGCHLD SIGCLD +#endif + +// FIXME: Octave-3.2.4 had HAVE_POSIX_SIGNALS in config.h, newer +// Octave has not (probably due to using gnulib?). We have not this +// test in configure now, but assume HAVE_POSIX_SIGNALS defined. +#define BLOCK_CHILD(nvar, ovar) BLOCK_SIGNAL (SIGCHLD, nvar, ovar) +#define UNBLOCK_CHILD(ovar) sigprocmask (SIG_SETMASK, &ovar, 0) +// #else +// #define BLOCK_CHILD(nvar, ovar) ovar = sigblock (sigmask (SIGCHLD)) +// #define UNBLOCK_CHILD(ovar) sigsetmask (ovar) + +#endif + /* children are not killed on parent exit; for that octave_child_list can not be used and an own SIGCHLD handler is needed */ @@ -75,7 +107,7 @@ std::string s; char dummy; - read(sock,&dummy,sizeof(char)); + read_or_exit (sock, &dummy, sizeof (char)); int p_err,count,r_len,num,fin,nl; struct pollfd *pollfd; @@ -191,16 +223,24 @@ feval ("sighup_dumps_octave_core", octave_value (0), 0); /* Redirect stdin and stdout to /dev/null. */ - freopen("/dev/null", "r", stdin); - freopen("/dev/null", "w", stdout); + if (! freopen ("/dev/null", "r", stdin)) { + perror ("freopen "); + clean_up_and_exit (1); + } + if (! freopen ("/dev/null", "w", stdout)) { + perror ("freopen "); + clean_up_and_exit (1); + } sprintf(errname,"/tmp/octave_error-%s.log",hostname); if(stat(errname,&fstat)==0){ sprintf(bakname,"/tmp/octave_error-%s.bak",hostname); rename(errname,bakname); } - freopen(errname, "w", stderr); - + if (! freopen (errname, "w", stderr)) { + perror ("freopen "); + clean_up_and_exit (1); + } int sock=0,asock=0,dsock=0,dasock=0,pid=0; struct sockaddr_in *addr,rem_addr;; @@ -335,7 +375,10 @@ sprintf(bakname,"/tmp/octave_error-%s_%05d.bak",hostname,pppid); rename(errname,bakname); } - freopen(errname, "w", stderr); + if (! freopen(errname, "w", stderr)) { + perror ("freopen "); + _exit (1); + } for(i=0;i<me;i++){ // recv; Modified: trunk/octave-forge/main/parallel/src/reval.cc =================================================================== --- trunk/octave-forge/main/parallel/src/reval.cc 2011-01-24 19:22:23 UTC (rev 8071) +++ trunk/octave-forge/main/parallel/src/reval.cc 2011-01-25 11:20:39 UTC (rev 8072) @@ -24,7 +24,7 @@ #include "error.h" #include "help.h" #include "oct-map.h" -#include "systime.h" +// #include "systime.h" #include "ov.h" #include "oct-obj.h" #include "utils.h" Modified: trunk/octave-forge/main/parallel/src/sclose.cc =================================================================== --- trunk/octave-forge/main/parallel/src/sclose.cc 2011-01-24 19:22:23 UTC (rev 8071) +++ trunk/octave-forge/main/parallel/src/sclose.cc 2011-01-25 11:20:39 UTC (rev 8072) @@ -25,7 +25,7 @@ #include "error.h" #include "help.h" #include "oct-map.h" -#include "systime.h" +// #include "systime.h" #include "ov.h" #include "oct-obj.h" #include "utils.h" This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <i7...@us...> - 2011-12-23 18:29:52
|
Revision: 9459 http://octave.svn.sourceforge.net/octave/?rev=9459&view=rev Author: i7tiol Date: 2011-12-23 18:29:45 +0000 (Fri, 23 Dec 2011) Log Message: ----------- Changes of coding style. Modified Paths: -------------- trunk/octave-forge/main/parallel/DESCRIPTION trunk/octave-forge/main/parallel/src/config.h.in trunk/octave-forge/main/parallel/src/connect.cc trunk/octave-forge/main/parallel/src/pserver.cc trunk/octave-forge/main/parallel/src/recv.cc trunk/octave-forge/main/parallel/src/reval.cc trunk/octave-forge/main/parallel/src/sclose.cc trunk/octave-forge/main/parallel/src/send.cc Modified: trunk/octave-forge/main/parallel/DESCRIPTION =================================================================== --- trunk/octave-forge/main/parallel/DESCRIPTION 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/DESCRIPTION 2011-12-23 18:29:45 UTC (rev 9459) @@ -1,6 +1,6 @@ Name: Parallel Version: 2.0.5 -Date: 2011-01-25 +Date: 2011-12-23 Author: Hayato Fujiwara and Olaf Till <ola...@un...> Maintainer: Olaf Till <ola...@un...> Title: Parallel Computing. Modified: trunk/octave-forge/main/parallel/src/config.h.in =================================================================== --- trunk/octave-forge/main/parallel/src/config.h.in 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/config.h.in 2011-12-23 18:29:45 UTC (rev 9459) @@ -1,2 +1,5 @@ /* Define as 1 if Octave version is <= 3.2.4. */ #undef OCTAVE_LE_3_2_4 + +/* some manual configuration */ +#define N_CONNECT_RETRIES 10 Modified: trunk/octave-forge/main/parallel/src/connect.cc =================================================================== --- trunk/octave-forge/main/parallel/src/connect.cc 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/connect.cc 2011-12-23 18:29:45 UTC (rev 9459) @@ -32,8 +32,6 @@ #include "sock-stream.h" -// COMM - static void read_if_no_error (int fd, void *buf, size_t count, int est) { @@ -56,196 +54,201 @@ Connect hosts and return sockets.") { octave_value retval; - int sock=0,col=0,row=0,i,j,len, not_connected; - double *sock_v=0; + int sock = 0, cols = 0, rows = 0, i, j, len, not_connected; + double *sock_v = 0; if (args.length () == 1) { - int pid=0,nl; + int pid = 0, nl; struct sockaddr_in *addr; struct hostent *he; - octave_value hosts=args(0); - charMatrix cm=hosts.char_matrix_value(); - char *host,*pt; // ,myname[16]; + octave_value hosts = args(0); + charMatrix cm = hosts.char_matrix_value (); + char *host, *pt; - errno=0; - // gethostname(myname,15); - row= cm.rows(); - col= cm.columns(); - cm= cm.transpose(); + errno = 0; + rows = cm.rows (); + cols = cm.columns (); + cm = cm.transpose (); - sock_v=(double *)calloc(3,row*sizeof(double)); + sock_v = (double *) calloc (3, rows * sizeof (double)); - for(i=1;i<row;i++){ - host=(char *)calloc(1,col+1); - strncpy(host,&cm.data()[col*i],col); - pt=strchr(host,' '); - if(pt==NULL) - host[col]='\0'; - else - *pt='\0'; + for (i = 1; i < rows; i++) + { + host = (char *) calloc (1, cols + 1); + strncpy (host, &cm.data ()[cols * i], cols); + pt = strchr (host, ' '); + if (pt == NULL) + host[cols] = '\0'; + else + *pt = '\0'; - sock=socket(PF_INET,SOCK_STREAM,0); - if(sock==-1){ - error("socket error "); - } - - addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); - - addr->sin_family=AF_INET; - addr->sin_port=htons(12502); - he=gethostbyname(host); - if(he == NULL){ - error("Unknown host %s",host); - } - memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length); - - not_connected = 1; - for(j=0;j<10;j++){ - if((not_connected = - connect(sock,(struct sockaddr *)addr,sizeof(*addr)))==0){ - break; - }else if(errno!=ECONNREFUSED){ - error("connect error "); - }else { - usleep(5000); - } - } + sock = socket (PF_INET, SOCK_STREAM, 0); + if (sock == -1) + { + error ("socket error "); + } - free(addr); - free(host); + addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); - if(not_connected) + addr->sin_family = AF_INET; + addr->sin_port = htons (12502); + he = gethostbyname (host); + if(he == NULL) + { + error ("Unknown host %s", host); + } + memcpy (&addr->sin_addr, he->h_addr_list[0], he->h_length); - error("Unable to connect to %s: Connection refused",host); + not_connected = 1; + for (j = 0; j < N_CONNECT_RETRIES; j++) + { + if((not_connected = connect (sock, (struct sockaddr *) addr, + sizeof (*addr))) == 0) + break; + else if (errno != ECONNREFUSED) + error("connect error "); + else + usleep(5000); + } - else - { - sock_v[i+row]=sock; - - int num_nodes=row-1; + free (addr); + free (host); - pid=getpid(); - nl=htonl(num_nodes); - write_if_no_error(sock,&nl,sizeof(int),error_state); - nl=htonl(i); - write_if_no_error(sock,&nl,sizeof(int),error_state); - nl=htonl(pid); - write_if_no_error(sock,&nl,sizeof(int),error_state); - host=(char *)calloc(128,sizeof(char)); - for(j=0;j<row;j++){ - strncpy(host,&cm.data()[col*j],col); - pt=strchr(host,' '); - if(pt==NULL) - host[col]='\0'; - else - *pt='\0'; - len=strlen(host)+1; - nl=htonl(len); - write_if_no_error(sock,&nl,sizeof(int),error_state); - write_if_no_error(sock,host,len,error_state); - } - free(host); - int comm_len; + if (not_connected) + + error ("Unable to connect to %s: Connection refused", host); + + else + { + sock_v[i + rows] = sock; + + int num_nodes = rows - 1; + + pid = getpid (); + nl = htonl (num_nodes); + write_if_no_error (sock, &nl, sizeof (int), error_state); + nl = htonl (i); + write_if_no_error (sock, &nl, sizeof (int), error_state); + nl = htonl (pid); + write_if_no_error (sock, &nl, sizeof (int), error_state); + host = (char *) calloc (128, sizeof (char)); + for(j = 0; j < rows; j++) + { + strncpy (host, &cm.data ()[cols * j], cols); + pt = strchr (host, ' '); + if(pt == NULL) + host[cols] = '\0'; + else + *pt='\0'; + len = strlen (host) + 1; + nl = htonl (len); + write_if_no_error (sock, &nl, sizeof (int), error_state); + write_if_no_error (sock, host, len, error_state); + } + free (host); + int comm_len; #ifdef OCTAVE_LE_3_2_4 - std::string directory = octave_env::getcwd (); + std::string directory = octave_env::getcwd (); #else - std::string directory = octave_env::get_current_directory (); + std::string directory = octave_env::get_current_directory (); #endif - comm_len=directory.length(); - nl=htonl(comm_len); - write_if_no_error(sock,&nl,sizeof(int),error_state); - write_if_no_error(sock,directory.c_str(),comm_len,error_state); - } + comm_len = directory.length (); + nl = htonl (comm_len); + write_if_no_error (sock, &nl, sizeof (int), error_state); + write_if_no_error (sock, directory.c_str (), comm_len, + error_state); + } - if (error_state) - { - free (sock_v); - return retval; - } - } + if (error_state) + { + free (sock_v); + return retval; + } + } - usleep(100); // why? + usleep (100); // why? - for(i=1;i<row;i++){ - - host=(char *)calloc(1,col+1); - - strncpy(host,&cm.data()[col*i],col); - pt=strchr(host,' '); - if(pt==NULL) - host[col]='\0'; - else - *pt='\0'; - - sock=socket(PF_INET,SOCK_STREAM,0); - if(sock==-1){ - perror("socket : "); - exit(-1); - } - - addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); - - addr->sin_family=AF_INET; - addr->sin_port=htons(12501); - he=gethostbyname(host); - if(he == NULL){ - error("Unknown host %s",host); - } - memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length); + for (i = 1; i < rows; i++) + { + host = (char *) calloc (1, cols + 1); + strncpy (host, &cm.data ()[cols * i], cols); + pt = strchr (host, ' '); + if (pt == NULL) + host[cols] = '\0'; + else + *pt = '\0'; + + sock = socket (PF_INET, SOCK_STREAM, 0); + if (sock == -1) + { + perror ("socket : "); + exit(-1); + } + + addr = (struct sockaddr_in *) calloc (1, sizeof (struct sockaddr_in)); + + addr->sin_family = AF_INET; + addr->sin_port = htons (12501); + he = gethostbyname (host); + if (he == NULL) + error ("Unknown host %s", host); + memcpy (&addr->sin_addr, he->h_addr_list[0], he->h_length); + not_connected = 1; - for(j=0;j<10;j++){ - if((not_connected = - connect(sock,(struct sockaddr *)addr,sizeof(*addr)))==0){ - break; - }else if(errno!=ECONNREFUSED){ - perror("connect error "); - }else { - usleep(5000); + for (j = 0; j < N_CONNECT_RETRIES; j++) + { + if((not_connected = + connect (sock, (struct sockaddr *) addr, sizeof (*addr))) == 0) + break; + else if (errno != ECONNREFUSED) + perror("connect error "); + else + usleep(5000); } - } - if(not_connected) + if (not_connected) - error("Unable to connect to %s: Connection refused",host); + error ("Unable to connect to %s: Connection refused", host); else { int bufsize = BUFF_SIZE; socklen_t ol; - ol=sizeof(bufsize); - setsockopt(sock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol); - setsockopt(sock,SOL_SOCKET,SO_RCVBUF,&bufsize,ol); - bufsize=1; - ol=sizeof(bufsize); - setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); + ol = sizeof (bufsize); + setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &bufsize, ol); + setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &bufsize, ol); + bufsize = 1; + ol = sizeof (bufsize); + setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &bufsize, ol); - int len=0,result=0;; - //send pppid - nl=htonl(pid); - write_if_no_error(sock,&nl,sizeof(int),error_state); - //send name size - strncpy(host,cm.data(),col); - pt=strchr(host,' '); - if(pt==NULL) - host[col]='\0'; + int len = 0, result = 0; + // send pppid + nl = htonl (pid); + write_if_no_error (sock, &nl, sizeof (int), error_state); + // send name size + strncpy (host, cm.data (), cols); + pt = strchr (host, ' '); + if (pt == NULL) + host[cols] = '\0'; else - *pt='\0'; - len=strlen(host); - nl=htonl(len); - write_if_no_error(sock,&nl,sizeof(int),error_state); - //send name - write_if_no_error(sock,host,len+1,error_state); - //recv result code - read_if_no_error(sock,&nl,sizeof(int),error_state); - result=ntohl(nl); - if(result==0){ - sock_v[i]=sock; - sock_v[i + 2 * row] = 1; // means "I'm the master" - socket_to_oct_iostream (sock); - }else{ + *pt = '\0'; + len = strlen (host); + nl = htonl (len); + write_if_no_error (sock, &nl, sizeof (int), error_state); + // send name + write_if_no_error (sock, host, len + 1, error_state); + // recv result code + read_if_no_error (sock, &nl, sizeof (int), error_state); + result = ntohl (nl); + if (result == 0) + { + sock_v[i] = sock; + sock_v[i + 2 * rows] = 1; // means "I'm the master" + socket_to_oct_iostream (sock); + } + else close(sock); - } } free(addr); @@ -256,13 +259,12 @@ free (sock_v); return retval; } - } + } - char lf='\n'; - for(i=1;i<row;i++){ - write_if_no_error((int)sock_v[i+row],&lf,sizeof(char),error_state); - // cout << i+row <<endl; - } + char lf = '\n'; + for (i = 1; i < rows; i++) + write_if_no_error ((int) sock_v[i + rows], &lf, sizeof (char), + error_state); if (error_state) { @@ -270,10 +272,10 @@ return retval; } - Matrix mx(row,3); - double *tmp =mx.fortran_vec(); - for (i=0;i<3*row;i++) - tmp[i]=sock_v[i]; + Matrix mx (rows, 3); + double *tmp = mx.fortran_vec (); + for (i = 0; i < 3 * rows; i++) + tmp[i] = sock_v[i]; retval = octave_value (mx); free (sock_v); Modified: trunk/octave-forge/main/parallel/src/pserver.cc =================================================================== --- trunk/octave-forge/main/parallel/src/pserver.cc 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/pserver.cc 2011-12-23 18:29:45 UTC (rev 9459) @@ -73,6 +73,12 @@ can not be used and an own SIGCHLD handler is needed */ static +bool pserver_child_event_handler (pid_t pid, int ev) +{ + return 1; // remove child from octave_child_list +} + +static void read_or_exit (int fd, void *buf, size_t count) { if (read (fd, buf, count) < (ssize_t)count) @@ -92,74 +98,74 @@ } } -static -bool pserver_child_event_handler (pid_t pid, int ev) -{ - return 1; // remove child from octave_child_list -} - void reval_loop (int sock) { // The big loop. - int len=0; + int len = 0; char *ev_str; std::string s; char dummy; read_or_exit (sock, &dummy, sizeof (char)); - int p_err,count,r_len,num,fin,nl; + int p_err, count, r_len, num, fin, nl; struct pollfd *pollfd; - - pollfd=(struct pollfd *)malloc(sizeof(struct pollfd)); - pollfd[0].fd=sock; - pollfd[0].events=POLLIN; + pollfd = (struct pollfd *) malloc (sizeof (struct pollfd)); + pollfd[0].fd = sock; + pollfd[0].events = POLLIN; + while (true) // function does not return { - pollfd[0].revents=0; - num=poll(pollfd,1,-1); - if(num){ - if(pollfd[0].revents && (pollfd[0].fd !=0)){ - if(pollfd[0].revents&POLLIN){ - read_or_exit(sock,&nl,sizeof(int)); - len=ntohl(nl); - } - if(pollfd[0].revents&POLLERR){ - std::cerr <<"Error condition "<<std::endl; - _exit (POLLERR); - } - if(pollfd[0].revents&POLLHUP){ - std::cerr <<"Hung up "<<std::endl; - _exit (POLLHUP); - } - if(pollfd[0].revents&POLLNVAL){ - std::cerr <<"fd not open "<<std::endl; - _exit (POLLNVAL); - } - } - } - ev_str=new char[len+1]; - count=0; - r_len=BUFF_SIZE; - while(count < len){ - if((len-count) < BUFF_SIZE) - r_len=len-count; - count += (fin = read (sock, (ev_str + count), r_len)); - if (fin <= 0) - { - error ("read error"); - _exit (1); - } - } - // read(sock,ev_str,len); - ev_str[len]='\0'; + pollfd[0].revents = 0; + num = poll (pollfd, 1, -1); + if (num) + { + if (pollfd[0].revents && (pollfd[0].fd != 0)) + { + if (pollfd[0].revents & POLLIN) + { + read_or_exit (sock, &nl, sizeof(int)); + len = ntohl (nl); + } + if (pollfd[0].revents & POLLERR) + { + std::cerr << "Error condition " << std::endl; + _exit (POLLERR); + } + if (pollfd[0].revents & POLLHUP) + { + std::cerr << "Hung up " << std::endl; + _exit (POLLHUP); + } + if (pollfd[0].revents & POLLNVAL) + { + std::cerr << "fd not open " << std::endl; + _exit (POLLNVAL); + } + } + } + ev_str = new char[len + 1]; + count = 0; + r_len = BUFF_SIZE; + while (count < len) + { + if ((len - count) < BUFF_SIZE) + r_len = len - count; + count += (fin = read (sock, (ev_str + count), r_len)); + if (fin <= 0) + { + error ("read error"); + _exit (1); + } + } + ev_str[len] = '\0'; - s=(std::string)ev_str; - eval_string(s,false,p_err,0); + s = (std::string) ev_str; + eval_string (s, false, p_err, 0); - delete(ev_str); + delete (ev_str); nl = 0; if (error_state) { @@ -190,142 +196,156 @@ \n\ Connect hosts and return sockets.") { - FILE *pidfile=0; - int ppid,len=118; - char hostname[120],pidname[128],errname[128],bakname[128]; + FILE *pidfile = 0; + int ppid, len = 118; + char hostname[120], pidname[128], errname[128], bakname[128]; struct stat fstat; - - gethostname(hostname,len); - sprintf(pidname,"/tmp/.octave-%s.pid",hostname); - if(stat(pidname,&fstat)==0){ - std::cerr << "octave : "<<hostname<<": server already running"<<std::endl; - clean_up_and_exit (1); - } + gethostname (hostname, len); + sprintf (pidname, "/tmp/.octave-%s.pid", hostname); + if (stat (pidname, &fstat) == 0) + { + std::cerr << "octave : " << hostname << ": server already running" + << std::endl; + clean_up_and_exit (1); + } + // initialize exit function feval ("__pserver_exit__", octave_value (hostname), 0); if (fork()) - clean_up_and_exit(0); - + clean_up_and_exit (0); + // register exit function feval ("atexit", octave_value ("__pserver_exit__"), 0); /* Touch lock file, mark for deletion. */ - ppid=getpid(); + ppid = getpid (); mark_for_deletion (pidname); pidfile = fopen (pidname, "w"); - fprintf(pidfile,"%d\n",ppid); - fclose(pidfile); - std::cout <<pidname<<std::endl; + fprintf (pidfile, "%d\n", ppid); + fclose (pidfile); + std::cout << pidname << std::endl; // avoid dumping octave_core if killed by a signal feval ("sigterm_dumps_octave_core", octave_value (0), 0); feval ("sighup_dumps_octave_core", octave_value (0), 0); /* Redirect stdin and stdout to /dev/null. */ - if (! freopen ("/dev/null", "r", stdin)) { - perror ("freopen "); - clean_up_and_exit (1); - } - if (! freopen ("/dev/null", "w", stdout)) { - perror ("freopen "); - clean_up_and_exit (1); - } + if (! freopen ("/dev/null", "r", stdin)) + { + perror ("freopen "); + clean_up_and_exit (1); + } + if (! freopen ("/dev/null", "w", stdout)) + { + perror ("freopen "); + clean_up_and_exit (1); + } - sprintf(errname,"/tmp/octave_error-%s.log",hostname); - if(stat(errname,&fstat)==0){ - sprintf(bakname,"/tmp/octave_error-%s.bak",hostname); - rename(errname,bakname); - } - if (! freopen (errname, "w", stderr)) { - perror ("freopen "); - clean_up_and_exit (1); - } + sprintf (errname, "/tmp/octave_error-%s.log", hostname); + if (stat (errname, &fstat) == 0) + { + sprintf (bakname, "/tmp/octave_error-%s.bak", hostname); + rename (errname, bakname); + } + if (! freopen (errname, "w", stderr)) + { + perror ("freopen "); + clean_up_and_exit (1); + } - int sock=0,asock=0,dsock=0,dasock=0,pid=0; - struct sockaddr_in *addr,rem_addr;; + int sock = 0, asock = 0, dsock = 0, dasock = 0, pid = 0; + struct sockaddr_in *addr, rem_addr;; - addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); - - sock=socket(PF_INET,SOCK_STREAM,0); - if(sock==-1){ - perror("socket "); - int len=118; - char hostname[120],pidname[128]; - gethostname(hostname,len); - sprintf(pidname,".octave-%s.pid",hostname); - remove (pidname); - close_files (); - clean_up_and_exit (1); - } + addr = (struct sockaddr_in *) calloc (1, sizeof (struct sockaddr_in)); - addr->sin_family=AF_INET; - addr->sin_port=htons(12502); - addr->sin_addr.s_addr=INADDR_ANY; - - if(bind(sock,(struct sockaddr *) addr,sizeof(*addr))!=0){ - perror("bind "); - int len=118; - char hostname[120],pidname[128]; - gethostname(hostname,len); - sprintf(pidname,".octave-%s.pid",hostname); - remove (pidname); - close_files (); - clean_up_and_exit (1); - } - free(addr); + sock = socket (PF_INET, SOCK_STREAM, 0); + if (sock == -1) + { + perror ("socket "); + int len = 118; + char hostname[120], pidname[128]; + gethostname (hostname, len); + sprintf (pidname, ".octave-%s.pid", hostname); + remove (pidname); + close_files (); + clean_up_and_exit (1); + } - if(listen(sock,1)!=0){ - perror("listen "); - clean_up_and_exit (1); - } + addr->sin_family = AF_INET; + addr->sin_port = htons (12502); + addr->sin_addr.s_addr = INADDR_ANY; + if (bind (sock, (struct sockaddr *) addr, sizeof (*addr))) + { + perror ("bind "); + int len = 118; + char hostname[120], pidname[128]; + gethostname (hostname, len); + sprintf (pidname, ".octave-%s.pid", hostname); + remove (pidname); + close_files (); + clean_up_and_exit (1); + } + free (addr); - dsock=socket(PF_INET,SOCK_STREAM,0); - if(dsock==-1){ - perror("socket : "); - clean_up_and_exit(-1); - } - - addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); - - addr->sin_family=AF_INET; - addr->sin_port=htons(12501); - addr->sin_addr.s_addr=INADDR_ANY; - - - if(bind(dsock,(struct sockaddr *) addr,sizeof(*addr))!=0){ - perror("bind : "); - clean_up_and_exit(-1); - } - if(listen(dsock,SOMAXCONN)!=0){ - perror("listen : "); - clean_up_and_exit(-1); - } - free(addr); - int param=1; - socklen_t ol=sizeof(param); - setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,¶m,ol); - setsockopt(dsock,SOL_SOCKET,SO_REUSEADDR,¶m,ol); + if (listen (sock, 1)) + { + perror ("listen "); + clean_up_and_exit (1); + } - int val=1,num_nodes,me,i,j=0,pppid=0,rpppid=0,result=0,nl; + + dsock = socket (PF_INET, SOCK_STREAM, 0); + if (dsock == -1) + { + perror ("socket : "); + clean_up_and_exit (-1); + } + + addr = (struct sockaddr_in *) calloc (1, sizeof (struct sockaddr_in)); + + addr->sin_family = AF_INET; + addr->sin_port = htons (12501); + addr->sin_addr.s_addr = INADDR_ANY; + + + if (bind (dsock, (struct sockaddr *) addr, sizeof (*addr))) + { + perror ("bind : "); + clean_up_and_exit (-1); + } + if (listen (dsock, SOMAXCONN)) + { + perror ("listen : "); + clean_up_and_exit (-1); + } + free (addr); + int param = 1; + socklen_t ol = sizeof (param); + setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, ¶m, ol); + setsockopt (dsock, SOL_SOCKET, SO_REUSEADDR, ¶m, ol); + + int val = 1, num_nodes, me, i, j = 0, pppid = 0, rpppid = 0, result = 0, + nl; int *sock_v; char **host_list,rem_name[128]; struct hostent *he; - ol=sizeof(val); + ol = sizeof (val); for(;;) { - asock=accept(sock,0,0); - if(asock==-1){ - perror("accept com"); - clean_up_and_exit (1); - } + asock = accept (sock, 0, 0); + if (asock == -1) + { + perror ("accept com"); + clean_up_and_exit (1); + } /* Normal production daemon. Fork, and have the child process the connection. The parent continues listening. */ - + // remove non-existing children from octave_child_list OCTAVE_QUIT; @@ -338,212 +358,229 @@ // restores all signals to state before BLOCK_CHILD #define RESTORE_SIGNALS(ovar) UNBLOCK_CHILD(ovar) - if((pid=fork())==-1) + if ((pid = fork ()) == -1) { - perror("fork "); + perror ("fork "); clean_up_and_exit (1); } - else if(pid==0) + else if (pid == 0) { - close(sock); - signal(SIGCHLD,SIG_DFL); - signal(SIGTERM,SIG_DFL); - signal(SIGQUIT,SIG_DFL); + close (sock); + signal (SIGCHLD, SIG_DFL); + signal (SIGTERM, SIG_DFL); + signal (SIGQUIT, SIG_DFL); RESTORE_SIGNALS (oset); - val=1; - ol=sizeof(val); - setsockopt(asock,SOL_SOCKET,SO_REUSEADDR,&val,ol); - - read_or_exit(asock,&nl,sizeof(int)); - num_nodes=ntohl(nl); - read_or_exit(asock,&nl,sizeof(int)); - me=ntohl(nl); - read_or_exit(asock,&nl,sizeof(int)); - pppid=ntohl(nl); - sock_v=(int *)calloc((num_nodes+1)*3,sizeof(int)); - host_list=(char **)calloc(num_nodes+1,sizeof(char *)); - for(i=0;i<=num_nodes;i++){ - read_or_exit(asock,&nl,sizeof(int)); - len=ntohl(nl); - host_list[i]=(char *)calloc(len,sizeof(char)); - read_or_exit(asock,host_list[i],len); - } + val = 1; + ol = sizeof (val); + setsockopt (asock, SOL_SOCKET, SO_REUSEADDR, &val, ol); - sprintf(errname,"/tmp/octave_error-%s_%05d.log",hostname,pppid); - if(stat(errname,&fstat)==0){ - sprintf(bakname,"/tmp/octave_error-%s_%05d.bak",hostname,pppid); - rename(errname,bakname); - } - if (! freopen(errname, "w", stderr)) { - perror ("freopen "); - _exit (1); - } + read_or_exit (asock, &nl, sizeof (int)); + num_nodes = ntohl (nl); + read_or_exit (asock, &nl, sizeof (int)); + me = ntohl (nl); + read_or_exit (asock, &nl, sizeof (int)); + pppid = ntohl (nl); + sock_v = (int *) calloc ((num_nodes + 1) * 3, sizeof (int)); + host_list = (char **) calloc (num_nodes + 1, sizeof (char *)); + for (i = 0; i <= num_nodes; i++) + { + read_or_exit (asock, &nl, sizeof (int)); + len = ntohl (nl); + host_list[i] = (char *) calloc (len, sizeof (char)); + read_or_exit (asock, host_list[i], len); + } - for(i=0;i<me;i++){ - // recv; - - len=sizeof(rem_addr); + sprintf (errname, "/tmp/octave_error-%s_%05d.log", hostname, + pppid); + if (stat (errname, &fstat) == 0) + { + sprintf (bakname, "/tmp/octave_error-%s_%05d.bak", hostname, + pppid); + rename (errname, bakname); + } + if (! freopen (errname, "w", stderr)) + { + perror ("freopen "); + _exit (1); + } - while(1){ - dasock=accept(dsock,(sockaddr *)&rem_addr,(socklen_t *)&len); - if(dasock==-1){ - perror("accept dat "); - _exit(-1); - } - int bufsize=BUFF_SIZE; - socklen_t ol; - ol=sizeof(bufsize); - setsockopt(dasock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol); - setsockopt(dasock,SOL_SOCKET,SO_RCVBUF,&bufsize,ol); - bufsize=1; - ol=sizeof(bufsize); - setsockopt(dasock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); - - //recv pppid (of connecting process at master) - read_or_exit(dasock,&nl,sizeof(int)); - rpppid=ntohl(nl); - //recv name size - read_or_exit(dasock,&nl,sizeof(int)); - len=ntohl(nl); - //recv name - read_or_exit(dasock,rem_name,len+1); - rem_name[len]='\0'; + for (i = 0; i < me; i++) + { + // recv; - for(j=0;j<me;j++){ - if(strcmp(rem_name,host_list[j])==0){ - sock_v[j]=dasock; - result=0; - break; - }else{ - result=-1; + len=sizeof(rem_addr); + + while (1) + { + dasock = accept (dsock, (sockaddr *) &rem_addr, + (socklen_t *) &len); + if (dasock == -1) + { + perror("accept dat "); + _exit(-1); + } + int bufsize = BUFF_SIZE; + socklen_t ol; + ol = sizeof (bufsize); + setsockopt (dasock, SOL_SOCKET, SO_SNDBUF, &bufsize, ol); + setsockopt (dasock, SOL_SOCKET, SO_RCVBUF, &bufsize, ol); + bufsize = 1; + ol = sizeof (bufsize); + setsockopt (dasock, SOL_SOCKET, SO_REUSEADDR, &bufsize, + ol); + + // recv pppid (of connecting process at master) + read_or_exit (dasock, &nl, sizeof (int)); + rpppid = ntohl (nl); + // recv name size + read_or_exit (dasock, &nl, sizeof (int)); + len = ntohl (nl); + // recv name + read_or_exit (dasock, rem_name, len + 1); + rem_name [len] = '\0'; + + for (j = 0; j < me; j++) + if (strcmp (rem_name, host_list[j]) == 0) + { + sock_v[j] = dasock; + result = 0; + break; + } + else + result=-1; + // send result code + if (result == 0) + { + if (pppid == rpppid) + { + nl = htonl (result); + write_or_exit (dasock, &nl, sizeof (int)); + socket_to_oct_iostream (dasock); + break; + } + // And else? Shouldn't this test have been + // made before? What is the policy if a + // different process at the master meddles in? + } + else + { + result = -1; + nl = htonl (result); + write_or_exit (dasock, &nl, sizeof (int)); + close (dasock); + sleep (1); + } } - } - //send result code - if(result==0){ - if(pppid==rpppid){ - nl=htonl(result); - write_or_exit(dasock,&nl,sizeof(int)); - socket_to_oct_iostream (dasock); - break; - } // And else? Shouldn't this test have been made - // before? What is the policy if a different - // process at the master meddles in? - }else{ - result=-1; - nl=htonl(result); - write_or_exit(dasock,&nl,sizeof(int)); - close(dasock); - sleep(1); - } + if (error_state) + _exit (-1); } - if (error_state) - _exit (-1); - } - // close(dsock); - //me - errno=0; - for(i=(me+1);i<=num_nodes;i++){ - dsock=-1; - // connect; - dsock=socket(PF_INET,SOCK_STREAM,0); - if(dsock==-1){ - perror("socket : "); - _exit(-1); - } - addr=(struct sockaddr_in *) calloc(1,sizeof(struct sockaddr_in)); - - addr->sin_family=AF_INET; - addr->sin_port=htons(12501); - he=gethostbyname(host_list[i]); - if(he == NULL){ - error("Unknown host %s",host_list[i]); - } - memcpy(&addr->sin_addr,he->h_addr_list[0],he->h_length); - while(1){ - for(j=0;j<10;j++){ - if(connect(dsock,(struct sockaddr *)addr,sizeof(*addr))==0){ - break; - }else if(errno!=ECONNREFUSED){ - perror("connect : "); + errno = 0; + + for (i = me + 1; i <= num_nodes; i++) + { + dsock = -1; + // connect; + dsock = socket (PF_INET, SOCK_STREAM, 0); + if (dsock == -1) + { + perror ("socket : "); _exit(-1); - }else { - usleep(5000); } - } - int bufsize=BUFF_SIZE; - socklen_t ol; - ol=sizeof(bufsize); - setsockopt(dsock,SOL_SOCKET,SO_SNDBUF,&bufsize,ol); - setsockopt(dsock,SOL_SOCKET,SO_RCVBUF,&bufsize,ol); - bufsize=1; - ol=sizeof(bufsize); - setsockopt(dsock,SOL_SOCKET,SO_REUSEADDR,&bufsize,ol); - - //send pppid - nl=htonl(pppid); - write_or_exit(dsock,&nl,sizeof(int)); - //send name size - len=strlen(host_list[me]); - nl=htonl(len); - write_or_exit(dsock,&nl,sizeof(int)); - //send name - write_or_exit(dsock,host_list[me],len+1); - //recv result code - read_or_exit(dsock,&nl,sizeof(int)); - result=ntohl(nl); + addr = (struct sockaddr_in *) + calloc (1, sizeof (struct sockaddr_in)); - if(result==0){ - sock_v[i]=dsock; - socket_to_oct_iostream (dsock); - break; - }else{ - close(dsock); - } + addr->sin_family = AF_INET; + addr->sin_port = htons (12501); + he = gethostbyname (host_list[i]); + if (he == NULL) + error("Unknown host %s",host_list[i]); + memcpy (&addr->sin_addr, he->h_addr_list[0], he->h_length); + while (1) + { + for (j = 0; j < N_CONNECT_RETRIES; j++) + if (connect (dsock, (struct sockaddr *) addr, + sizeof (*addr))==0) + break; + else if (errno != ECONNREFUSED) + { + perror ("connect : "); + _exit(-1); + } + else + usleep(5000); + int bufsize = BUFF_SIZE; + socklen_t ol; + ol = sizeof (bufsize); + setsockopt (dsock, SOL_SOCKET, SO_SNDBUF, &bufsize, ol); + setsockopt (dsock, SOL_SOCKET, SO_RCVBUF, &bufsize, ol); + bufsize = 1; + ol = sizeof (bufsize); + setsockopt (dsock, SOL_SOCKET, SO_REUSEADDR, &bufsize, + ol); + + // send pppid + nl = htonl (pppid); + write_or_exit (dsock, &nl, sizeof (int)); + // send name size + len = strlen (host_list[me]); + nl = htonl (len); + write_or_exit (dsock, &nl, sizeof (int)); + // send name + write_or_exit (dsock, host_list[me], len + 1); + // recv result code + read_or_exit (dsock, &nl, sizeof (int)); + result = ntohl (nl); + + if (result == 0) + { + sock_v[i] = dsock; + socket_to_oct_iostream (dsock); + break; + } + else + close (dsock); + } + free (addr); + if (error_state) + _exit (-1); } - free(addr); - if (error_state) - _exit (-1); - } - for(i=0;i<=num_nodes;i++){ - free(host_list[i]); - } - free(host_list); + for (i = 0; i <= num_nodes; i++) + free (host_list[i]); + free (host_list); char * s; int stat; - s=(char *)calloc(32,sizeof(char)); - sprintf(s,"sockets=zeros(%d,3)",num_nodes+1); - eval_string(std::string(s),true,stat); - for(i=0;i<=num_nodes;i++){ - sprintf(s,"sockets(%d,:)=[%d,0,%d]",i+1,sock_v[i],sock_v[i+2*(num_nodes+1)]); - eval_string(std::string(s),true,stat); + s = (char *) calloc (32, sizeof (char)); + sprintf (s, "sockets=zeros(%d,3)", num_nodes + 1); + eval_string (std::string (s), true, stat); + for (i = 0; i <= num_nodes; i++) + { + sprintf (s, "sockets(%d,:)=[%d,0,%d]", i + 1, sock_v[i], + sock_v[i + 2 * (num_nodes + 1)]); + eval_string (std::string (s), true, stat); } - - free(sock_v); + free (sock_v); + interactive = false; - + line_editing = false; -// int retval = main_loop (); - char *newdir; int newdir_len; - read_or_exit(asock,&nl,sizeof(int)); - newdir_len=ntohl(nl); - newdir=(char *)calloc(sizeof(char),newdir_len+1); - read_or_exit(asock,newdir,newdir_len); - int cd_ok=octave_env::chdir (newdir); - if(cd_ok != true){ + read_or_exit (asock, &nl, sizeof (int)); + newdir_len = ntohl (nl); + newdir = (char *) calloc (sizeof (char), newdir_len + 1); + read_or_exit (asock, newdir, newdir_len); + int cd_ok = octave_env::chdir (newdir); + if (! cd_ok) octave_env::chdir ("/tmp"); - } reval_loop (asock); // does not return - } // parent @@ -551,11 +588,9 @@ octave_child_list::insert (pid, pserver_child_event_handler); RESTORE_SIGNALS (oset); - - close(asock); - + + close (asock); } - close(sock); - clean_up_and_exit(-1); - + close (sock); + clean_up_and_exit (-1); } Modified: trunk/octave-forge/main/parallel/src/recv.cc =================================================================== --- trunk/octave-forge/main/parallel/src/recv.cc 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/recv.cc 2011-12-23 18:29:45 UTC (rev 9459) @@ -64,40 +64,28 @@ sockaddr_in r_addr; struct hostent *hehe; socklen_t len = sizeof (r_addr); - getpeername (spollfd.fd, (sockaddr*)&r_addr, &len); - hehe = gethostbyaddr ((char *)&r_addr.sin_addr.s_addr, - sizeof(r_addr.sin_addr), AF_INET); + getpeername (spollfd.fd, (sockaddr*) &r_addr, &len); + hehe = gethostbyaddr ((char *) &r_addr.sin_addr.s_addr, + sizeof (r_addr.sin_addr), AF_INET); if (spollfd.revents & POLLIN) { pid = getpid (); - if (read (spollfd.fd, &nl, sizeof (int)) < - sizeof (int)) - { - error ("read error"); - } + if (read (spollfd.fd, &nl, sizeof (int)) < sizeof (int)) + error ("read error"); error_code = ntohl (nl); - if (write (spollfd.fd, &nl, sizeof (int)) < - sizeof (int)) - { - error ("write error"); - } + if (write (spollfd.fd, &nl, sizeof (int)) < sizeof (int)) + error ("write error"); error ("error occurred in %s\n\tsee " "%s:/tmp/octave_error-%s_%5d.log for detail", hehe->h_name, hehe->h_name, hehe->h_name, pid); } if (spollfd.revents & POLLERR) - { - error ("Error condition - %s", hehe->h_name); - } + error ("Error condition - %s", hehe->h_name); if (spollfd.revents & POLLHUP) - { - error("Hung up - %s", hehe->h_name); - } + error("Hung up - %s", hehe->h_name); if (spollfd.revents & POLLNVAL) - { - error("fd not open - %s", hehe->h_name); - } + error("fd not open - %s", hehe->h_name); } } Modified: trunk/octave-forge/main/parallel/src/reval.cc =================================================================== --- trunk/octave-forge/main/parallel/src/reval.cc 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/reval.cc 2011-12-23 18:29:45 UTC (rev 9459) @@ -69,110 +69,112 @@ { octave_value retval; - if(args.length () ==2) + if(args.length () == 2) { - int sock,row=0,col=0,nsock=0,i,j,k, fin; - int error_code,count=0,r_len=0,nl; - octave_value val=args(0); - Matrix sock_m=args(1).matrix_value(); - charMatrix cm=val.char_matrix_value(); + int sock, rows = 0, cols = 0, nsock = 0, i, j, k, fin; + int error_code, count = 0, r_len = 0, nl; + octave_value val = args(0); + Matrix sock_m = args(1).matrix_value (); + charMatrix cm = val.char_matrix_value (); char comm[256]; - - nsock=sock_m.rows(); - row=val.rows(); - col=val.columns(); + nsock = sock_m.rows (); - int num,pid; + rows = val.rows (); + cols = val.columns (); + + int num, pid; struct pollfd *pollfd; - pollfd=(struct pollfd *)malloc(nsock*sizeof(struct pollfd)); - for(i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i+nsock]; - pollfd[i].fd=sock; - pollfd[i].events = POLLIN; - } + pollfd = (struct pollfd *) malloc (nsock * sizeof (struct pollfd)); + for (i = 0; i < nsock; i++) + { + sock = (int) sock_m.data ()[i+nsock]; + pollfd[i].fd = sock; + pollfd[i].events = POLLIN; + } - num=poll(pollfd,nsock,0); - if(num){ - for(k=0;k<nsock;k++){ - if(pollfd[k].revents && (pollfd[k].fd !=0)){ - sockaddr_in r_addr; - struct hostent *hehe; - socklen_t len = sizeof(r_addr); - getpeername(pollfd[k].fd, (sockaddr*)&r_addr, &len ); - hehe=gethostbyaddr((char *)&r_addr.sin_addr.s_addr,sizeof(r_addr.sin_addr), AF_INET); + num = poll (pollfd, nsock, 0); + if (num) + for (k = 0; k < nsock; k++) + if (pollfd[k].revents && pollfd[k].fd) + { + sockaddr_in r_addr; + struct hostent *hehe; + socklen_t len = sizeof (r_addr); + getpeername (pollfd[k].fd, (sockaddr *) &r_addr, &len ); + hehe = gethostbyaddr ((char *) &r_addr.sin_addr.s_addr, + sizeof (r_addr.sin_addr), AF_INET); - if(pollfd[k].revents&POLLIN){ - pid=getpid(); - if (read (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) + if (pollfd[k].revents & POLLIN) { - error ("read error"); + pid = getpid (); + if (read (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) + { + error ("read error"); + break; + } + error_code = ntohl (nl); + if (write (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) + { + error ("write error"); + break; + } + error ("error occurred in %s\n\tsee " + "%s:/tmp/octave_error-%s_%5d.log for detail", + hehe->h_name, hehe->h_name, hehe->h_name, pid); + } + if (pollfd[k].revents & POLLERR) + { + error ("Error condition - %s", hehe->h_name); break; } - error_code=ntohl(nl); - if (write (pollfd[k].fd, &nl, sizeof (int)) < sizeof (int)) + if (pollfd[k].revents & POLLHUP) { - error ("write error"); + error ("Hung up - %s", hehe->h_name); break; } - error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); + if (pollfd[k].revents & POLLNVAL) + { + error ("fd not open - %s", hehe->h_name); + break; + } } - if(pollfd[k].revents&POLLERR){ - error("Error condition - %s",hehe->h_name ); - break; - } - if(pollfd[k].revents&POLLHUP){ - error("Hung up - %s",hehe->h_name ); - break; - } - if(pollfd[k].revents & POLLNVAL){ - error ("fd not open - %s", hehe->h_name); - break; - } - } - } - } free (pollfd); if (error_state) return retval; - for(i=0;i<nsock;i++){ - sock=(int)sock_m.data()[i+nsock]; - if(sock!=0){ - for(j=0;j<row;j++){ - strncpy(comm,(cm.extract(j,0,j,col-1).data()),col); - comm[col]='\n'; - comm[col+1]='\0'; - nl=htonl(col); - if (write(sock,&nl,sizeof(int)) < sizeof (int)) + for (i = 0; i < nsock; i++) + { + sock = (int) sock_m.data ()[i+nsock]; + if (sock) + for(j=0;j<rows;j++) { - error ("write error"); - return retval; + strncpy (comm, cm.extract (j, 0, j, cols - 1).data (), cols); + comm[cols] = '\n'; + comm[cols+1] = '\0'; + nl = htonl (cols); + if (write (sock, &nl, sizeof (int)) < sizeof (int)) + { + error ("write error"); + return retval; + } + count = 0; + r_len = BUFF_SIZE; + while (count < cols) + { + if (cols - count < BUFF_SIZE) + r_len = cols - count; + count += (fin = write (sock, (comm + count), r_len)); + if (fin <= 0) + { + error ("write error"); + return retval; + } + } } - count=0; - r_len=BUFF_SIZE; - while(count <col){ - if((col-count) < BUFF_SIZE) - r_len=col-count; - count += (fin = write (sock, (comm + count), r_len)); - if (fin <= 0) - { - error ("write error"); - return retval; - } - } - - // Blocking Execution -// read(sock,&error_state,sizeof(int)); -// if(error_state){ -// error("Error occurred on %d" sock); -// } - - } } - } } else print_usage (); Modified: trunk/octave-forge/main/parallel/src/sclose.cc =================================================================== --- trunk/octave-forge/main/parallel/src/sclose.cc 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/sclose.cc 2011-12-23 18:29:45 UTC (rev 9459) @@ -48,79 +48,81 @@ DEFUN_DLD (sclose, args, , "sclose (sockets)\n\ \n\ -Close sockets") +Close sockets.") { octave_value retval; - errno=0; + errno = 0; - if(args.length () == 1) + if (args.length () == 1) { - int i,nsock=0,sock,k,err=0,nl, rows; + int i, nsock = 0, sock, k, err = 0, nl, rows; - rows = args(0).matrix_value().rows(); + rows = args(0).matrix_value().rows (); nsock = rows * 2; - - if((int)args(0).matrix_value().data()[0]==0){ - int num,pid; - struct pollfd *pollfd; - pollfd=(struct pollfd *)malloc(nsock*sizeof(struct pollfd)); - for(i=0;i<nsock;i++){ - sock=(int)args(0).matrix_value().data()[i+nsock]; - pollfd[i].fd=sock; - pollfd[i].events=POLLIN; - } - - num=poll(pollfd,nsock,0); - if(num){ - for(k=0;k<nsock;k++){ - if(pollfd[k].revents && (pollfd[k].fd !=0)){ - sockaddr_in r_addr; - struct hostent *hehe; - socklen_t len = sizeof(r_addr); - getpeername(pollfd[k].fd, (sockaddr*)&r_addr, &len ); - hehe=gethostbyaddr((char *)&r_addr.sin_addr.s_addr,sizeof(r_addr.sin_addr), AF_INET); - - if(pollfd[k].revents&POLLIN){ - pid=getpid(); - if (read(pollfd[k].fd,&nl,sizeof(int)) < (ssize_t)sizeof(int)) - error ("read error"); - if (! error_state) - if (write(pollfd[k].fd,&nl,sizeof(int)) < (ssize_t)sizeof(int)) - error ("write error"); - error("error occurred in %s\n\tsee %s:/tmp/octave_error-%s_%5d.log for detail",hehe->h_name,hehe->h_name,hehe->h_name,pid ); + if ((int) args(0).matrix_value ().data ()[0] == 0) + { + int num, pid; + struct pollfd *pollfd; + pollfd = (struct pollfd *) malloc (nsock * sizeof (struct pollfd)); + for (i = 0; i < nsock; i++) + { + sock = (int) args(0).matrix_value ().data ()[i+nsock]; + pollfd[i].fd = sock; + pollfd[i].events = POLLIN; + } + + num = poll (pollfd, nsock, 0); + if(num) + for(k=0;k<nsock;k++) + if(pollfd[k].revents && (pollfd[k].fd !=0)) + { + sockaddr_in r_addr; + struct hostent *hehe; + socklen_t len = sizeof (r_addr); + getpeername (pollfd[k].fd, (sockaddr *) &r_addr, &len); + hehe = gethostbyaddr ((char *) &r_addr.sin_addr.s_addr, + sizeof (r_addr.sin_addr), AF_INET); + + if (pollfd[k].revents & POLLIN) + { + pid=getpid(); + + if (read (pollfd[k].fd, &nl, sizeof(int)) < + (ssize_t) sizeof (int)) + error ("read error"); + if (! error_state) + if (write (pollfd[k].fd, &nl, sizeof(int)) < + (ssize_t) sizeof (int)) + error ("write error"); + error ("error occurred in %s\n\tsee " + "%s:/tmp/octave_error-%s_%5d.log for detail", + hehe->h_name, hehe->h_name, hehe->h_name, pid); + } + if (pollfd[k].revents & POLLERR) + error ("Error condition - %s", hehe->h_name); + if (pollfd[k].revents & POLLHUP) + error ("Hung up - %s", hehe->h_name); + if (pollfd[k].revents & POLLNVAL) + error ("fd not open - %s", hehe->h_name); } - if(pollfd[k].revents&POLLERR){ - error("Error condition - %s",hehe->h_name ); - } - if(pollfd[k].revents&POLLHUP){ - error("Hung up - %s",hehe->h_name ); - } - if(pollfd[k].revents&POLLNVAL){ - error("fd not open - %s",hehe->h_name ); - } - } - } } - } - for(i=nsock-1;i>=rows;i--){ - sock=(int)args(0).matrix_value().data()[i]; - if(sock!=0){ - if(close(sock)!=0) + for (i = nsock - 1; i >= rows; i--) + { + sock = (int) args(0).matrix_value ().data ()[i]; + if (sock && close (sock)) err++; } - } - for(i=rows-1;i>=0;i--){ - sock=(int)args(0).matrix_value().data()[i]; - if(sock!=0){ - if(octave_stream_list::remove (octave_value (sock), "") != 0) + for (i = rows - 1; i >= 0; i--) + { + sock = (int) args(0).matrix_value ().data ()[i]; + if (sock && octave_stream_list::remove (octave_value (sock), "")) err++; } - } - if(err) - error("sclose error %d",err); - retval=(double)err; + if (err) + error("sclose error %d", err); + retval = (double) err; } else print_usage (); Modified: trunk/octave-forge/main/parallel/src/send.cc =================================================================== --- trunk/octave-forge/main/parallel/src/send.cc 2011-12-23 15:58:06 UTC (rev 9458) +++ trunk/octave-forge/main/parallel/src/send.cc 2011-12-23 18:29:45 UTC (rev 9459) @@ -71,9 +71,9 @@ sockaddr_in r_addr; struct hostent *hehe; socklen_t len = sizeof (r_addr); - getpeername (pollfd[k].fd, (sockaddr*)&r_addr, &len); - hehe = gethostbyaddr ((char *)&r_addr.sin_addr.s_addr, - sizeof(r_addr.sin_addr), AF_INET); + getpeername (pollfd[k].fd, (sockaddr*) &r_addr, &len); + hehe = gethostbyaddr ((char *) &r_addr.sin_addr.s_addr, + sizeof (r_addr.sin_addr), AF_INET); if (pollfd[k].revents & POLLIN) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |