assorted-commits Mailing List for Assorted projects (Page 36)
Brought to you by:
yangzhang
You can subscribe to this list here.
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
(9) |
Dec
(12) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2008 |
Jan
(86) |
Feb
(265) |
Mar
(96) |
Apr
(47) |
May
(136) |
Jun
(28) |
Jul
(57) |
Aug
(42) |
Sep
(20) |
Oct
(67) |
Nov
(37) |
Dec
(34) |
2009 |
Jan
(39) |
Feb
(85) |
Mar
(96) |
Apr
(24) |
May
(82) |
Jun
(13) |
Jul
(10) |
Aug
(8) |
Sep
(2) |
Oct
(20) |
Nov
(31) |
Dec
(17) |
2010 |
Jan
(16) |
Feb
(11) |
Mar
(17) |
Apr
(53) |
May
(31) |
Jun
(13) |
Jul
(3) |
Aug
(6) |
Sep
(11) |
Oct
(4) |
Nov
(17) |
Dec
(17) |
2011 |
Jan
(3) |
Feb
(19) |
Mar
(5) |
Apr
(17) |
May
(3) |
Jun
(4) |
Jul
(14) |
Aug
(3) |
Sep
(2) |
Oct
(1) |
Nov
(3) |
Dec
(2) |
2012 |
Jan
(3) |
Feb
(7) |
Mar
(1) |
Apr
|
May
(1) |
Jun
|
Jul
(4) |
Aug
(5) |
Sep
(2) |
Oct
(3) |
Nov
|
Dec
|
2013 |
Jan
|
Feb
|
Mar
(9) |
Apr
(5) |
May
|
Jun
(2) |
Jul
(1) |
Aug
(10) |
Sep
(1) |
Oct
(2) |
Nov
|
Dec
|
2014 |
Jan
(1) |
Feb
(3) |
Mar
(3) |
Apr
(1) |
May
(4) |
Jun
|
Jul
|
Aug
|
Sep
(2) |
Oct
|
Nov
|
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(1) |
Nov
|
Dec
|
2016 |
Jan
(1) |
Feb
|
Mar
(2) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(1) |
Oct
|
Nov
|
Dec
|
2017 |
Jan
|
Feb
|
Mar
(1) |
Apr
|
May
(5) |
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(2) |
2018 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
(1) |
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <yan...@us...> - 2008-10-22 22:49:54
|
Revision: 1027 http://assorted.svn.sourceforge.net/assorted/?rev=1027&view=rev Author: yangzhang Date: 2008-10-22 22:49:51 +0000 (Wed, 22 Oct 2008) Log Message: ----------- fixed bug coexisting with existing filters Modified Paths: -------------- wp-easy-filter/trunk/src/easyfilt.php Modified: wp-easy-filter/trunk/src/easyfilt.php =================================================================== --- wp-easy-filter/trunk/src/easyfilt.php 2008-10-22 22:40:35 UTC (rev 1026) +++ wp-easy-filter/trunk/src/easyfilt.php 2008-10-22 22:49:51 UTC (rev 1027) @@ -41,7 +41,6 @@ $debug = false; $debugfile = '/mit/y_z/web_scripts/wp/wp-content/plugins/easyfilt/log.txt'; add_filter('the_content', 'filter_custom', 1); -remove_filter('the_content', 'wpautop'); # # Constants. @@ -62,6 +61,7 @@ $lines = explode("\n", $content); $tag = trim(substr($lines[0], 2)); if (substr($lines[0], 0, 2) === '#!' && $tag2cmd[$tag]) { + remove_filter('the_content', 'wpautop'); $cmd = $tag2cmd[$tag]; $process = proc_open($cmd, $descriptorspec, $pipes); $body = implode("\n", array_slice($lines, 1)); @@ -75,6 +75,7 @@ $retval = proc_close($process); } } else { + add_filter('the_content', 'wpautop'); $filtered = $content; } if ($debug) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-22 22:40:47
|
Revision: 1026 http://assorted.svn.sourceforge.net/assorted/?rev=1026&view=rev Author: yangzhang Date: 2008-10-22 22:40:35 +0000 (Wed, 22 Oct 2008) Log Message: ----------- added wp-easy-filter src and README Modified Paths: -------------- shell-tools/trunk/src/bash-commons/common.bash Added Paths: ----------- wp-easy-filter/trunk/README wp-easy-filter/trunk/src/easyfilt.php Modified: shell-tools/trunk/src/bash-commons/common.bash =================================================================== --- shell-tools/trunk/src/bash-commons/common.bash 2008-10-22 20:02:01 UTC (rev 1025) +++ shell-tools/trunk/src/bash-commons/common.bash 2008-10-22 22:40:35 UTC (rev 1026) @@ -271,7 +271,7 @@ find "$@" -type l | while read file ; do if [ ! -e "$file" ] ; then - sudo -u pkg rm "$file" + rm "$file" fi done } Added: wp-easy-filter/trunk/README =================================================================== --- wp-easy-filter/trunk/README (rev 0) +++ wp-easy-filter/trunk/README 2008-10-22 22:40:35 UTC (rev 1026) @@ -0,0 +1,41 @@ +Overview +-------- + +This is a simple, general filter plug-in for WordPress. You specify a mapping +from tags to commands, such as: + + $tag2cmd = array('pandoc' => '/usr/bin/pandoc -s --tab-stop=2'); + +Then, for posts which are prefixed with a shebang line containing that tag, as in: + + #!pandoc + + Hello, world. + +Then the plug-in will feed the post contents to that command's stdin and return +the rendered output to WordPress for display. + +This plug-in was designed to allow me to start using [Pandoc] for writing my +blog posts. (I couldn't force myself to use the [PHP Markdown Extras] +plug-in.) + +It disables the `wpautop` filter, which automatically inserts `<p>` tags (among +other magic), because that filter cannot properly parse the style of HTML that +Pandoc outputs. + +Setup +----- + +Drop `easyfilt.php` into your `wp-content/plugins/` directory, then activate +the plug-in from the admin interface. + +Notes +----- + +The author of [PHP Markdown Extras] wrote an [informative blog post] describing +problems he had getting his filter to work properly and co-exist with the other +built-in filters. + +[Pandoc]: http://johnmacfarlane.net/pandoc/ +[PHP Markdown Extras]: http://michelf.com/projects/php-markdown/extra/ +[informative blog post]: http://michelf.com/weblog/2005/wordpress-text-flow-vs-markdown/ Added: wp-easy-filter/trunk/src/easyfilt.php =================================================================== --- wp-easy-filter/trunk/src/easyfilt.php (rev 0) +++ wp-easy-filter/trunk/src/easyfilt.php 2008-10-22 22:40:35 UTC (rev 1026) @@ -0,0 +1,90 @@ +<? +/* +Plugin Name: Easy Filter +Plugin URI: http://assorted.sf.net/wp-easy-filter/ +Description: Easy filtering of blog posts. +Version: 1.0 +Author: Yang Zhang +Author URI: http://www.mit.edu/~y_z/ +*/ + +/* +Copyright 2008 Yang Zhang + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +# +# Configuration options. +# + +$tag2cmd = array( + 'pandoc' => '/mit/y_z/.local/armed/bin/mit-pandoc -S --tab-stop=2' +); + +# +# More configuration options. +# + +$debug = false; +$debugfile = '/mit/y_z/web_scripts/wp/wp-content/plugins/easyfilt/log.txt'; +add_filter('the_content', 'filter_custom', 1); +remove_filter('the_content', 'wpautop'); + +# +# Constants. +# + +$descriptorspec = $descriptorspec = array( + 0 => array("pipe", "r"), // stdin is a pipe that the child will read from + 1 => array("pipe", "w"), // stdout is a pipe that the child will write to + 2 => array("pipe", "w") // stderr is a file to write to +); + +# +# Custom filter. +# + +function filter_custom($content) { + global $debug, $tag2cmd, $wp_filter, $descriptorspec; + $lines = explode("\n", $content); + $tag = trim(substr($lines[0], 2)); + if (substr($lines[0], 0, 2) === '#!' && $tag2cmd[$tag]) { + $cmd = $tag2cmd[$tag]; + $process = proc_open($cmd, $descriptorspec, $pipes); + $body = implode("\n", array_slice($lines, 1)); + if (is_resource($process)) { + fwrite($pipes[0], $body); + fclose($pipes[0]); + + $filtered = stream_get_contents($pipes[1]); + fclose($pipes[1]); + + $retval = proc_close($process); + } + } else { + $filtered = $content; + } + if ($debug) { + $f = fopen($debugfile, 'w'); + fwrite($f, print_r($wp_filter, true)); + fwrite($f, print_r($lines, true)); + fwrite($f, substr($lines[0], 0, 2) . " " . (substr($lines[0], 0, 2) === '#!') . " " . $cmd . " " . $tag2cmd[$cmd]); + fwrite($f, "\n===\n$content\n===\n$filtered"); + fclose($f); + } + return $filtered; +} +?> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-22 20:02:16
|
Revision: 1025 http://assorted.svn.sourceforge.net/assorted/?rev=1025&view=rev Author: yangzhang Date: 2008-10-22 20:02:01 +0000 (Wed, 22 Oct 2008) Log Message: ----------- fixed cabal-install Modified Paths: -------------- shell-tools/trunk/src/bash-commons/common.bash Modified: shell-tools/trunk/src/bash-commons/common.bash =================================================================== --- shell-tools/trunk/src/bash-commons/common.bash 2008-10-22 20:00:37 UTC (rev 1024) +++ shell-tools/trunk/src/bash-commons/common.bash 2008-10-22 20:02:01 UTC (rev 1025) @@ -469,8 +469,8 @@ cabal-install() { for i in "${@:-}" ; do + pushd "$i" && local pkg="$(basename "$(pwd)" )" && - pushd "$i" && runhaskell Setup.*hs clean && runhaskell Setup.*hs configure --user --prefix="$HOME/.local/pkg/ghc-${pkg%-*}" && runhaskell Setup.*hs build && This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-22 20:00:58
|
Revision: 1024 http://assorted.svn.sourceforge.net/assorted/?rev=1024&view=rev Author: yangzhang Date: 2008-10-22 20:00:37 +0000 (Wed, 22 Oct 2008) Log Message: ----------- added cabal-install Modified Paths: -------------- shell-tools/trunk/src/bash-commons/common.bash Added Paths: ----------- sandbox/trunk/src/php/ sandbox/trunk/src/php/subprocess.php wp-easy-filter/ wp-easy-filter/trunk/ wp-easy-filter/trunk/src/ Added: sandbox/trunk/src/php/subprocess.php =================================================================== --- sandbox/trunk/src/php/subprocess.php (rev 0) +++ sandbox/trunk/src/php/subprocess.php 2008-10-22 20:00:37 UTC (rev 1024) @@ -0,0 +1,23 @@ +<? +$descriptorspec = $descriptorspec = array( + 0 => array("pipe", "r"), // stdin is a pipe that the child will read from + 1 => array("pipe", "w"), // stdout is a pipe that the child will write to + 2 => array("pipe", "w") // stderr is a file to write to +); +$cwd = '/tmp'; +$env = array('some_option' => 'option'); + +$process = proc_open('php', $descriptorspec, $pipes, $cwd, $env); + +if (is_resource($process)) { + fwrite($pipes[0], '<? print_r($_ENV); ?>'); + fclose($pipes[0]); + + $stdout = stream_get_contents($pipes[1]); + fclose($pipes[1]); + echo $stdout; + + $return_value = proc_close($process); + echo "command returned $return_value\n"; +} +?> Modified: shell-tools/trunk/src/bash-commons/common.bash =================================================================== --- shell-tools/trunk/src/bash-commons/common.bash 2008-10-22 06:47:05 UTC (rev 1023) +++ shell-tools/trunk/src/bash-commons/common.bash 2008-10-22 20:00:37 UTC (rev 1024) @@ -467,6 +467,19 @@ find "${1:-.}" -type d -empty -print0 | xargs -0r rmdir -p } +cabal-install() { + for i in "${@:-}" ; do + local pkg="$(basename "$(pwd)" )" && + pushd "$i" && + runhaskell Setup.*hs clean && + runhaskell Setup.*hs configure --user --prefix="$HOME/.local/pkg/ghc-${pkg%-*}" && + runhaskell Setup.*hs build && + runhaskell Setup.*hs install --user && + popd || + break + done +} + #if ! is_declared indent ; then # noindent #else This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-22 06:47:14
|
Revision: 1023 http://assorted.svn.sourceforge.net/assorted/?rev=1023&view=rev Author: yangzhang Date: 2008-10-22 06:47:05 +0000 (Wed, 22 Oct 2008) Log Message: ----------- added a short readme to sync-svn-props Added Paths: ----------- sandbox/trunk/src/one-off-scripts/sync-svn-props/README Added: sandbox/trunk/src/one-off-scripts/sync-svn-props/README =================================================================== --- sandbox/trunk/src/one-off-scripts/sync-svn-props/README (rev 0) +++ sandbox/trunk/src/one-off-scripts/sync-svn-props/README 2008-10-22 06:47:05 UTC (rev 1023) @@ -0,0 +1,3 @@ +Given two svn repositories with the same files, replace the svn properties in +all the files in the destination repository with those of the source +repository. The destination is hardcoded as ~/jz, and the source ~/hz. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-22 06:45:31
|
Revision: 1022 http://assorted.svn.sourceforge.net/assorted/?rev=1022&view=rev Author: yangzhang Date: 2008-10-22 06:45:21 +0000 (Wed, 22 Oct 2008) Log Message: ----------- added sync-svn-props Added Paths: ----------- sandbox/trunk/src/one-off-scripts/sync-svn-props/ sandbox/trunk/src/one-off-scripts/sync-svn-props/sync.bash Added: sandbox/trunk/src/one-off-scripts/sync-svn-props/sync.bash =================================================================== --- sandbox/trunk/src/one-off-scripts/sync-svn-props/sync.bash (rev 0) +++ sandbox/trunk/src/one-off-scripts/sync-svn-props/sync.bash 2008-10-22 06:45:21 UTC (rev 1022) @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +set -o errexit -o nounset + +cd ~/hz/ + +svn ls -R | while read i ; do + j=~/jz/"$i" + + svn pl "$j" | tail -n +2 | while read p ; do + svn pd "$p" "$j" + done + + svn pl "$i" | tail -n +2 | while read p ; do + svn ps "$p" "$( svn pg "$p" "$i" )" "$j" + done +done Property changes on: sandbox/trunk/src/one-off-scripts/sync-svn-props/sync.bash ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-19 20:50:34
|
Revision: 1021 http://assorted.svn.sourceforge.net/assorted/?rev=1021&view=rev Author: yangzhang Date: 2008-10-19 20:50:21 +0000 (Sun, 19 Oct 2008) Log Message: ----------- fixed misconfig Modified Paths: -------------- configs/trunk/src/ssh/config Modified: configs/trunk/src/ssh/config =================================================================== --- configs/trunk/src/ssh/config 2008-10-18 17:45:20 UTC (rev 1020) +++ configs/trunk/src/ssh/config 2008-10-19 20:50:21 UTC (rev 1021) @@ -77,7 +77,7 @@ Host zs HostName zs.ath.cx User yang - LocalForward 5901:localhost:5900 + LocalForward 5901 localhost:5900 # Rohan K Host kr @@ -109,7 +109,7 @@ HostName harvard.csail.mit.edu User yang ForwardX11 yes - LocalForward 5902:localhost:5900 + LocalForward 5902 localhost:5900 Host cs HostName login.csail.mit.edu This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-18 17:45:34
|
Revision: 1020 http://assorted.svn.sourceforge.net/assorted/?rev=1020&view=rev Author: yangzhang Date: 2008-10-18 17:45:20 +0000 (Sat, 18 Oct 2008) Log Message: ----------- added local port forwarding for vnc Modified Paths: -------------- configs/trunk/src/ssh/config Modified: configs/trunk/src/ssh/config =================================================================== --- configs/trunk/src/ssh/config 2008-10-18 08:12:24 UTC (rev 1019) +++ configs/trunk/src/ssh/config 2008-10-18 17:45:20 UTC (rev 1020) @@ -77,6 +77,7 @@ Host zs HostName zs.ath.cx User yang + LocalForward 5901:localhost:5900 # Rohan K Host kr @@ -108,6 +109,7 @@ HostName harvard.csail.mit.edu User yang ForwardX11 yes + LocalForward 5902:localhost:5900 Host cs HostName login.csail.mit.edu This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-18 08:12:37
|
Revision: 1019 http://assorted.svn.sourceforge.net/assorted/?rev=1019&view=rev Author: yangzhang Date: 2008-10-18 08:12:24 +0000 (Sat, 18 Oct 2008) Log Message: ----------- added demo of `protected`'s subtleties Added Paths: ----------- sandbox/trunk/src/java/protected/ sandbox/trunk/src/java/protected/pkga/ sandbox/trunk/src/java/protected/pkga/A.java sandbox/trunk/src/java/protected/pkga/B.java sandbox/trunk/src/java/protected/pkgb/ sandbox/trunk/src/java/protected/pkgb/B.java Added: sandbox/trunk/src/java/protected/pkga/A.java =================================================================== --- sandbox/trunk/src/java/protected/pkga/A.java (rev 0) +++ sandbox/trunk/src/java/protected/pkga/A.java 2008-10-18 08:12:24 UTC (rev 1019) @@ -0,0 +1,4 @@ +package pkga; +public class A { + protected int x; +} Added: sandbox/trunk/src/java/protected/pkga/B.java =================================================================== --- sandbox/trunk/src/java/protected/pkga/B.java (rev 0) +++ sandbox/trunk/src/java/protected/pkga/B.java 2008-10-18 08:12:24 UTC (rev 1019) @@ -0,0 +1,7 @@ +package pkga; +public class B extends A { + public int f() { return x; } + public int g(B b) { return b.x; } + // works fine + public int h(A a) { return a.x; } +} Added: sandbox/trunk/src/java/protected/pkgb/B.java =================================================================== --- sandbox/trunk/src/java/protected/pkgb/B.java (rev 0) +++ sandbox/trunk/src/java/protected/pkgb/B.java 2008-10-18 08:12:24 UTC (rev 1019) @@ -0,0 +1,8 @@ +package pkgb; +import pkga.*; +public class B extends A { + public int f() { return x; } + public int g(B b) { return b.x; } + // doesn't work + // public int h(A a) { return a.x; } +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-14 03:35:43
|
Revision: 1018 http://assorted.svn.sourceforge.net/assorted/?rev=1018&view=rev Author: yangzhang Date: 2008-10-14 03:35:37 +0000 (Tue, 14 Oct 2008) Log Message: ----------- added span, group_as_subseqs; added unittests for span, group_as_subseqs Modified Paths: -------------- python-commons/trunk/src/commons/seqs.py Modified: python-commons/trunk/src/commons/seqs.py =================================================================== --- python-commons/trunk/src/commons/seqs.py 2008-10-14 03:34:36 UTC (rev 1017) +++ python-commons/trunk/src/commons/seqs.py 2008-10-14 03:35:37 UTC (rev 1018) @@ -1,6 +1,12 @@ # -*- mode: python; tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4; -*- # vim:ft=python:et:sw=4:ts=4 +""" +Sequences, streams, and generators. + +@var default_chunk_size: The default chunk size used by L{chunkify}. +""" + from __future__ import ( absolute_import, with_statement ) from cStringIO import StringIO @@ -8,15 +14,37 @@ from struct import pack, unpack from contextlib import closing from itertools import ( chain, count, ifilterfalse, islice, - izip, repeat, tee ) -from .log import warning + izip, repeat, tee, takewhile ) +from commons.log import warning -""" -Sequences, streams, and generators. +__all__ = ''' +default_chunk_size +read_pickle +read_pickles +safe_pickler +write_pickle +streamlen +chunkify +total +ClosedError +PersistentConsumedSeq +PersistentSeq +pairwise +argmax +argmin +all +concat +flatten +grouper +chunker +countstep +take +delimit +interleave +group_as_subseqs +span +'''.split() -@var default_chunk_size: The default chunk size used by L{chunkify}. -""" - default_chunk_size = 8192 def read_pickle( read, init = '', length_thresh = 100000 ): @@ -283,15 +311,6 @@ else: return min((fn(e), e) for e in sequence)[1] -def all(seq, pred=bool): - """ - Returns C{True} if C{pred(x) is True} for every element in the - iterable - """ - for elem in ifilterfalse(pred, seq): - return False - return True - def concat(listOfLists): return list(chain(*listOfLists)) @@ -364,3 +383,53 @@ # TODO not quite right def interleave(xs, ys): return concat(izip( xs, ys )) + +def span(pred, xs): + """ + Returns (successes, failures), where successes is the sequence of any + consecutive elements at the head of L{xs} that satisfy the predicate, and + second is everything else. + """ + xs = iter(xs) + first_failure = [] + def successes(): + for x in xs: + if not pred(x): + first_failure.append(x) + break + yield x + return list(successes()), chain(first_failure, xs) + +def group_as_subseqs(xs, key = lambda x: x): + """ + Takes a sequence and breaks it up into multiple subsequences, which are + groups keyed on L{key}. + """ + xs = iter(xs) + while True: + setfirst = False + for x in xs: + first = x + setfirst = True + break + if not setfirst: break # We've hit the end + firstkey = key(first) + successes, xs = span(lambda x: key(x) == firstkey, xs) + yield chain([first], successes) + +import unittest + +class test_seqs(unittest.TestCase): + def test_span(self): + xs,ys = span(lambda x: x < 5, range(10)) + self.assertEqual(list(xs), range(5)) + self.assertEqual(list(ys), range(5,10)) + def test_group(self): + a,b,c,d = group_as_subseqs(range(10), lambda x: x / 3) + self.assertEqual(list(a), range(0,3)) + self.assertEqual(list(b), range(3,6)) + self.assertEqual(list(c), range(6,9)) + self.assertEqual(list(d), range(9,10)) + +if __name__ == '__main__': + unittest.main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-14 03:34:51
|
Revision: 1017 http://assorted.svn.sourceforge.net/assorted/?rev=1017&view=rev Author: yangzhang Date: 2008-10-14 03:34:36 +0000 (Tue, 14 Oct 2008) Log Message: ----------- added remove_empty_lines, nat_lang_join, or_join, and_join, unit tests for nat_lang_join et al. Modified Paths: -------------- python-commons/trunk/src/commons/strs.py Modified: python-commons/trunk/src/commons/strs.py =================================================================== --- python-commons/trunk/src/commons/strs.py 2008-10-14 03:32:58 UTC (rev 1016) +++ python-commons/trunk/src/commons/strs.py 2008-10-14 03:34:36 UTC (rev 1017) @@ -6,18 +6,22 @@ """ __all__ = ''' +and_join +cp1252_to_unicode +cp1252_to_unicode_translations +dos2unix format +indent +nat_lang_join +or_join +remove_empty_lines safe_ascii -cp1252_to_unicode_translations -cp1252_to_unicode -unwrap -indent underline -dos2unix unicode2html +unwrap '''.split() -import itertools, cgi, re +import itertools, cgi, re, unittest def format( *args ): """Formats the args as they would be by the C{print} built-in.""" @@ -86,6 +90,27 @@ if isinstance(s, str): s = s.split('\n') return '\n'.join( ind + line for line in s ) +def unindent(text, amt = None): + """ + If L{amt} is 0, removes all leading whitespace from each line in L{text}. + If L{amt} is L{None}, finds the smallest amount of leading whitespace on + any non-empty line and removes that many chars from each line. If L{amt} + is positive, removes L{amt} chars from each line. + """ + lines = text.split('\n') + if amt == 0: + return '\n'.join( line.lstrip() for line in lines ) + def count_indent(line): + for i,c in enumerate(line): + if not c.isspace(): return i + if amt is None: + amt = min( count_indent(line) for line in lines if line.strip() != '' ) + return '\n'.join( line[amt:] for line in lines ) + +def remove_empty_lines(s): + "Removes all empty lines (or lines of just whitespace)." + return '\n'.join( line for line in s.split('\n') if line.strip() != '' ) + def underline(s, sep): """ Appends to L{s} a newline and a number of repetitions of L{sep}; the number @@ -101,3 +126,41 @@ def unicode2html(s): "Extends cgi.escape() with escapes for all unicode characters." return pat.sub(lambda m: '&#%d;' % ord(m.group()), cgi.escape(s)) + +def nat_lang_join(xs, last_glue, two_glue = None, glue = ', '): + """ + Natural-language join. Join a sequence of strings together into a + comma-separated list, but where the last pair is joined with the given + special glue. (You may also override the non-last glue, which defaults to + a ', '.) + + @param xs: The sequence of strings. This must be a list-like sequence, not + a generated one. + + @param last_glue: The string used to join the final pair of elements, when + there are more than two elements. + + @param two_glue: The string used to join both elements in a 2-element + sequence. Defaults to None, which means to use last_glue. + + @param glue: The string used to join all the other elements. + """ + if len(xs) == 0: return '' + elif len(xs) == 1: return xs[0] + elif len(xs) == 2: return xs[0] + two_glue + xs[1] + else: return glue.join(xs[:-1]) + last_glue + xs[-1] + +def or_join(xs): return nat_lang_join(xs, ', or ', ' or ') +def and_join(xs): return nat_lang_join(xs, ', and ', ' and ') + +class str_test( unittest.TestCase ): + def test_nat_lang_join( self ): + self.assertEqual( nat_lang_join( 'alpha beta gamma'.split(), ' | ' ), + 'alpha, beta | gamma' ) + self.assertEqual( and_join( 'alpha beta gamma'.split() ), + 'alpha, beta, and gamma' ) + self.assertEqual( or_join( 'alpha beta'.split() ), + 'alpha or beta' ) + +if __name__ == '__main__': + unittest.main() This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-14 03:33:17
|
Revision: 1016 http://assorted.svn.sourceforge.net/assorted/?rev=1016&view=rev Author: yangzhang Date: 2008-10-14 03:32:58 +0000 (Tue, 14 Oct 2008) Log Message: ----------- added cleartimeout; changed wrap from function to method Modified Paths: -------------- python-commons/trunk/src/commons/misc.py Modified: python-commons/trunk/src/commons/misc.py =================================================================== --- python-commons/trunk/src/commons/misc.py 2008-10-10 22:21:12 UTC (rev 1015) +++ python-commons/trunk/src/commons/misc.py 2008-10-14 03:32:58 UTC (rev 1016) @@ -7,6 +7,7 @@ __all__ = ''' TerminalController +cleartimeout days default_if_none generate_bit_fields @@ -18,7 +19,6 @@ tc timeout_exception wall_clock -wrap_color '''.split() # @@ -108,6 +108,8 @@ signal(SIGALRM, handle) alarm(secs) +def cleartimeout(): alarm(0) + # # Functional # @@ -297,18 +299,17 @@ if s == '$$': return s else: return getattr(self, s[2:-1]) + def wrap(self, s, color): + """ + Wraps L{s} in L{color} (resets color to NORMAL at the end). + """ + return self.render( '${%s}%s${NORMAL}' % (color, s) ) + remove_colors_pat = re.compile('\033\\[[0-9;]*m') def remove_colors(s): 'Removes ANSI escape codes (e.g. those for terminal colors).' return remove_colors_pat.sub('', s) -tc = TerminalController() -def wrap_color(s, color, tc = tc): - """ - Wraps L{s} in L{color} (resets color to NORMAL at the end). - """ - return tc.render( '${%s}%s${NORMAL}' % (color, s) ) - import unittest class color_test( unittest.TestCase ): This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-10 22:21:21
|
Revision: 1015 http://assorted.svn.sourceforge.net/assorted/?rev=1015&view=rev Author: yangzhang Date: 2008-10-10 22:21:12 +0000 (Fri, 10 Oct 2008) Log Message: ----------- added phony to makefile Modified Paths: -------------- simple-build/trunk/src/Makefile Modified: simple-build/trunk/src/Makefile =================================================================== --- simple-build/trunk/src/Makefile 2008-10-10 22:20:50 UTC (rev 1014) +++ simple-build/trunk/src/Makefile 2008-10-10 22:21:12 UTC (rev 1015) @@ -18,3 +18,5 @@ clean: rm -rf meta + +.PHONY: clean run all This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-10 22:21:00
|
Revision: 1014 http://assorted.svn.sourceforge.net/assorted/?rev=1014&view=rev Author: yangzhang Date: 2008-10-10 22:20:50 +0000 (Fri, 10 Oct 2008) Log Message: ----------- added change history Modified Paths: -------------- scala-commons/trunk/README Modified: scala-commons/trunk/README =================================================================== --- scala-commons/trunk/README 2008-10-10 22:19:43 UTC (rev 1013) +++ scala-commons/trunk/README 2008-10-10 22:20:50 UTC (rev 1014) @@ -38,6 +38,17 @@ [SimpleBuild]: http://assorted.sf.net/simple-build/ +Changes +------- + +version 0.2 + +- updated for Scala 2.7.2 + +version 0.1 + +- initial release, for use with Scala 2.7.0 + Related work ------------ This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-10 22:19:52
|
Revision: 1013 http://assorted.svn.sourceforge.net/assorted/?rev=1013&view=rev Author: yangzhang Date: 2008-10-10 22:19:43 +0000 (Fri, 10 Oct 2008) Log Message: ----------- more words Modified Paths: -------------- configs/trunk/src/aspell/aspell.en.pws Modified: configs/trunk/src/aspell/aspell.en.pws =================================================================== --- configs/trunk/src/aspell/aspell.en.pws 2008-10-10 22:18:27 UTC (rev 1012) +++ configs/trunk/src/aspell/aspell.en.pws 2008-10-10 22:19:43 UTC (rev 1013) @@ -8,7 +8,6 @@ arg RDBMS SerAccel -al uplink thresholding hypercall @@ -123,3 +122,38 @@ pseudocode McRT ILP +IP +SureRoute +Skype +CDF +BGP +Stoica +multi +al +dst +failover +scalably +failovers +BitTorrent +Kbps +Katz +Akamai's +sqrt +ISP +SOSR +overlay's +Fei +Nakao +VPNs +VoIP +reachability +Cui +RON's +PlanetLab's +Chernoff +priori +src +NIO +RTT +PlanetLab +liveness This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-10 22:18:41
|
Revision: 1012 http://assorted.svn.sourceforge.net/assorted/?rev=1012&view=rev Author: yangzhang Date: 2008-10-10 22:18:27 +0000 (Fri, 10 Oct 2008) Log Message: ----------- added dupes for printing duplicate lines Modified Paths: -------------- shell-tools/trunk/src/bash-commons/assorted.bash Added Paths: ----------- shell-tools/trunk/src/dupes.py Modified: shell-tools/trunk/src/bash-commons/assorted.bash =================================================================== --- shell-tools/trunk/src/bash-commons/assorted.bash 2008-10-09 06:23:31 UTC (rev 1011) +++ shell-tools/trunk/src/bash-commons/assorted.bash 2008-10-10 22:18:27 UTC (rev 1012) @@ -208,6 +208,7 @@ publish() { stage + scp -o Compression=on -o CompressionLevel=9 "$stagedir" tar czf - -C "$stagedir" . | ssh $prof " set -o errexit -o nounset Added: shell-tools/trunk/src/dupes.py =================================================================== --- shell-tools/trunk/src/dupes.py (rev 0) +++ shell-tools/trunk/src/dupes.py 2008-10-10 22:18:27 UTC (rev 1012) @@ -0,0 +1,14 @@ +#!/usr/bin/env python + +from commons.seqs import pairwise +from commons.startup import run_main +from sys import stdin, stdout + +def main(argv): + cur = None + for a,b in pairwise(stdin): + if cur!=a and a==b: + stdout.write(a) + cur = a + +run_main() Property changes on: shell-tools/trunk/src/dupes.py ___________________________________________________________________ Added: svn:executable + * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-09 06:23:43
|
Revision: 1011 http://assorted.svn.sourceforge.net/assorted/?rev=1011&view=rev Author: yangzhang Date: 2008-10-09 06:23:31 +0000 (Thu, 09 Oct 2008) Log Message: ----------- got udp tests working again by adding receive-handling; reorganized tests in general Modified Paths: -------------- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java java-reactor/trunk/src/reactor/AbstractStreamHandler.java java-reactor/trunk/src/reactor/DatagramHandler.java java-reactor/trunk/src/reactor/DatagramSession.java java-reactor/trunk/src/reactor/ReactorTest.java java-reactor/trunk/src/reactor/SocketHandler.java java-reactor/trunk/src/reactor/SocketSession.java java-reactor/trunk/src/reactor/StreamHandler.java java-reactor/trunk/src/reactor/StreamSession.java Modified: java-reactor/trunk/src/reactor/AbstractDatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/AbstractDatagramHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -13,15 +13,19 @@ @Override public void handleReceive(DatagramSession session, InetSocketAddress src, - ByteBuffer buf) { + ByteBuffer buf) throws Exception { } @Override - public void handleRead(SocketSession session, ByteBuffer buf) { + public void handleRead(SocketSession session, ByteBuffer buf) throws Exception { } @Override - public void handleWrite(SocketSession session) { + public void handleWrite(SocketSession session) throws Exception { } + @Override + public void handleClose(SocketSession session) throws Exception { + } + } Modified: java-reactor/trunk/src/reactor/AbstractStreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractStreamHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/AbstractStreamHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -11,15 +11,19 @@ public class AbstractStreamHandler implements StreamHandler { @Override - public void handleRead(SocketSession session, ByteBuffer buf) { + public void handleRead(SocketSession session, ByteBuffer buf) throws Exception { } @Override - public void handleWrite(SocketSession session) { + public void handleWrite(SocketSession session) throws Exception { } @Override - public void handleConnect(StreamSession session) { + public void handleConnect(StreamSession session) throws Exception { } + + @Override + public void handleClose(SocketSession session) throws Exception { + } } Modified: java-reactor/trunk/src/reactor/DatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/DatagramHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -24,6 +24,6 @@ * The received packet. */ public void handleReceive(DatagramSession session, InetSocketAddress src, - ByteBuffer buf); + ByteBuffer buf) throws Exception; } Modified: java-reactor/trunk/src/reactor/DatagramSession.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramSession.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/DatagramSession.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -1,5 +1,6 @@ package reactor; +import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -81,9 +82,48 @@ */ public void send(ByteBuffer writeBuf, InetSocketAddress dst) throws Exception { + writeBuf.rewind(); int bytes = ((DatagramChannel) channel).send(writeBuf, dst); // TODO: don't trigger an assertion failure! assert bytes == writeBuf.limit(); } + /** + * Overrides superclass behavior to support promiscuous receives (calls into + * DatagramHandler.handleReceive()). + */ + @Override + protected boolean read(SelectionKey key, ByteBuffer readBuf) + throws IOException { + if (channel.isConnected()) { + return super.read(key, readBuf); + } else { + final InetSocketAddress srcSa; + try { + srcSa = (InetSocketAddress) channel.receive(readBuf); + if (srcSa == null) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + close(); + return false; + } + } catch (IOException ex) { + close(); + return false; + } + + // after channel wrote to buf, set lim = pos, then pos = + // 0 + readBuf.flip(); + // callback + try { + handler.handleReceive(this, srcSa, readBuf); + } catch (Exception ex) { + ex.printStackTrace(); + } + + return true; + } + } + } Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -1,7 +1,5 @@ package reactor; -import java.net.DatagramPacket; -import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -24,26 +22,47 @@ e = Executors.newCachedThreadPool(); } - public void spawn(final Runnable r) { - e.submit(new Runnable() { - public void run() { - try { - r.run(); - } catch (Exception e) { - e.printStackTrace(); - } + abstract class MyRunnable implements Runnable { + public MyRunnable() { + e.submit(this); + } + + public void run() { + try { + myrun(); + } catch (Exception ex) { + ex.printStackTrace(); } - }); + } + + abstract public void myrun() throws Exception; } - private Runnable makeRunnable(final int i) { - return new Runnable() { - public void run() { - System.out.println(i); + public void testScheduler() throws Exception { + // Start the server. + new MyRunnable() { + public void myrun() throws Exception { + Reactor r = new Reactor(); + + for (int i = 0; i < 10; i++) { + final int j = i; + r.schedule(new Runnable() { + public void run() { + System.out.println(j); + } + }, 200 * i, TimeUnit.MILLISECONDS); + } + + r.react(); } }; } + /** + * Run a promiscuous echo server/client, in separate reactors. + * + * @throws Exception + */ public void testUDP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); int serverPort = 11111, clientPort = 22222; @@ -51,64 +70,54 @@ serverSa = new InetSocketAddress(localhost, serverPort); clientSa = new InetSocketAddress(localhost, clientPort); - final DatagramHandler handler = new AbstractDatagramHandler() { - @Override - public void handleReceive(DatagramSession service, - InetSocketAddress src, ByteBuffer buf) { - System.out.println("received: " + buf); + // Start the server. + new MyRunnable() { + public void myrun() throws Exception { + final Reactor r = new Reactor(); + r.createDatagramSession(null, serverSa, + new AbstractDatagramHandler() { + @Override + public void handleReceive(DatagramSession session, + InetSocketAddress src, ByteBuffer buf) + throws Exception { + System.out.println("server received '" + + Charset.defaultCharset().decode(buf) + + "' from " + src); + session.send(buf, src); + r.shutdown(); + } + }); + r.react(); } }; - // Start the server. - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - r.createDatagramSession(null, serverSa, handler); - - for (int i = 0; i < 10; i++) - r.schedule(makeRunnable(i), 200 * i, - TimeUnit.MILLISECONDS); - - Thread.sleep(1000); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } + // Start the client. + new MyRunnable() { + public void myrun() throws Exception { + final Reactor r = new Reactor(); + DatagramSession s = r.createDatagramSession(null, clientSa, + new AbstractDatagramHandler() { + @Override + public void handleReceive(DatagramSession session, + InetSocketAddress src, ByteBuffer buf) { + System.out.println("client received '" + + Charset.defaultCharset().decode(buf) + + "' from " + src); + r.shutdown(); + } + }); + Thread.sleep(100); + s.send(ByteBuffer.wrap("hello".getBytes()), serverSa); + r.react(); } - }); - -// // Start the client. -// spawn(new Runnable() { -// public void run() { -// try { -// Reactor r = new Reactor(); -// ByteBuffer writeBuf = ByteBuffer.allocate(5); -// DatagramSession s = r.createDatagramSession(null, clientSa, handler); -// Thread.sleep(2000); -// s.send(writeBuf, clientSa); -// r.react(); -// } catch (Exception ex) { -// ex.printStackTrace(); -// } -// } -// }); - - // Start a second client. - spawn(new Runnable() { - public void run() { - try { - byte[] writeBuf = new byte[] { 0, 1, 2, 3 }; - Thread.sleep(3000); - new DatagramSocket().send(new DatagramPacket(writeBuf, - writeBuf.length, serverSa)); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); + }; } + /** + * Run an echo server/client, in the same reactor. + * + * @throws Exception + */ public void testTCP() throws Exception { InetAddress localhost = InetAddress.getLocalHost(); int serverPort = 11111; @@ -129,15 +138,23 @@ // CharBuffer charBuffer = decoder.decode(buf); // System.out.println("read: " + // charBuffer.toString()); - System.out.println("echoing: '" - + Charset.forName("us-ascii").decode(buf) + System.out.println("echoing '" + + Charset.defaultCharset().decode(buf) + "'"); session.write(buf); + session.close(); +// r.shutdown(); } catch (Exception ex) { throw new RuntimeException(ex); } } + @Override + public void handleClose(SocketSession session) + throws Exception { + System.out.println("server closed"); + } + }; } @@ -146,43 +163,40 @@ }); - r.schedule(new Runnable() { - public void run() { - try { - r.connect(serverSa, new AbstractStreamHandler() { + r.schedule(new MyRunnable() { + public void myrun() throws Exception { + r.connect(serverSa, new AbstractStreamHandler() { - @Override - public void handleConnect(StreamSession session) { - System.out.println("connected"); - try { - session.write(ByteBuffer.wrap("hello" - .getBytes())); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } + @Override + public void handleConnect(StreamSession session) throws Exception { + System.out.println("connected"); + session.write(ByteBuffer.wrap("hello".getBytes())); + } - @Override - public void handleRead(SocketSession session, - ByteBuffer buf) { - System.out.println("got back: '" - + Charset.forName("us-ascii").decode(buf) - + "'"); - r.shutdown(); - } + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + System.out + .println("got back: '" + + Charset.defaultCharset().decode( + buf) + "'"); + r.shutdown(); + } + + @Override + public void handleClose(SocketSession session) { + System.out.println("client closed"); + } - }); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + }); } }, 1, TimeUnit.SECONDS); r.react(); } public static void main(String args[]) throws Exception { - new ReactorTest().testUDP(); - // new ReactorTest().testTCP(); + // new ReactorTest().testScheduler(); + // new ReactorTest().testUDP(); + new ReactorTest().testTCP(); } } Modified: java-reactor/trunk/src/reactor/SocketHandler.java =================================================================== --- java-reactor/trunk/src/reactor/SocketHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/SocketHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -12,7 +12,8 @@ * @param buf * The read data. */ - public void handleRead(SocketSession session, ByteBuffer buf); + public void handleRead(SocketSession session, ByteBuffer buf) + throws Exception; /** * Handle the event where the socket is writable. @@ -20,6 +21,17 @@ * @param session * The session that is ready for writing. */ - public void handleWrite(SocketSession session); + public void handleWrite(SocketSession session) throws Exception; + /** + * Handle a socket close. For TCP sockets, either the remote or local socket + * was closed. For UDP sockets, only the local socket could have been closed + * (as there is no "connection"). + * + * @param session + * The session that was closed. + * @throws Exception + */ + public void handleClose(SocketSession session) throws Exception; + } Modified: java-reactor/trunk/src/reactor/SocketSession.java =================================================================== --- java-reactor/trunk/src/reactor/SocketSession.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/SocketSession.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -8,15 +8,58 @@ import java.util.Deque; public abstract class SocketSession { - + abstract protected SocketHandler getHandler(); + abstract protected ByteChannel getChannel(); + abstract protected SelectionKey getKey(); - + private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); private final Deque<ByteBuffer> pendingWrites = new ArrayDeque<ByteBuffer>(); /** + * Meant to be overriden in subclasses to support alternative read + * procedures. Performs the actual reading and callback. If we get an + * exception from the handler, then just print it and keep on trucking. + * + * @param key + * The selection key for this channel. + * @param readBuf + * The buffer to read into and pass to the handler. + * @return Whether the read loop should continue to iterate. + * @throws IOException + */ + protected boolean read(SelectionKey key, ByteBuffer readBuf) + throws IOException { + try { + int numRead = getChannel().read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + close(); + return false; + } + if (numRead == 0) + return false; + } catch (IOException ex) { + close(); + return false; + } + + // after channel wrote to buf, set lim = pos, then pos = + // 0 + readBuf.flip(); + // callback + try { + getHandler().handleRead(this, readBuf); + } catch (Exception ex) { + ex.printStackTrace(); + } + return true; + } + + /** * This is called by the reactor when there is a message to be received. * Reading messages is the priority, so this is done before anything else. * @@ -26,33 +69,9 @@ * @throws Exception */ void handleRead(SelectionKey key) throws Exception { - while (true) { - try { - int numRead = getChannel().read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down cleanly. Do - // the same from our end and cancel the channel. - getChannel().close(); - key.cancel(); - break; - } - if (numRead == 0) - break; - - // after channel wrote to buf, set lim = pos, then pos = - // 0 - readBuf.flip(); - // callback - getHandler().handleRead(this, readBuf); - // recycle buffer - readBuf.clear(); - } catch (IOException e) { - // The remote forcibly closed the connection, cancel - // the selection key and close the channel. - key.cancel(); - getChannel().close(); - } - } + while (read(key, readBuf)) + // recycle buffer + readBuf.clear(); } /** @@ -71,7 +90,8 @@ ByteBuffer buf = (ByteBuffer) pendingWrites.getFirst(); int initPos = buf.position(); int bytes = getChannel().write(buf); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + assert buf.remaining() == buf.limit() - bytes + && buf.position() == initPos + bytes; if (buf.remaining() > 0) { // ... or the socket's buffer fills up break; @@ -103,8 +123,8 @@ buf.rewind(); int initPos = buf.position(); int bytes = getChannel().write(buf); - System.out.println("writing " + bytes); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + assert buf.remaining() == buf.limit() - bytes + && buf.position() == initPos + bytes; if (buf.remaining() > 0) { pendingWrites.add(buf); // We're now interested in when the socket is ready for writing. @@ -113,12 +133,21 @@ } /** - * Close the underlying socket. + * Close the underlying socket and call the handleClose() method. If the + * handler throws an Exception, just print it. * * @throws Exception */ - public void close() throws Exception { - getChannel().close(); + public void close() throws IOException { + if (getChannel().isOpen()) { + getKey().cancel(); + getChannel().close(); + try { + getHandler().handleClose(this); + } catch (Exception ex) { + ex.printStackTrace(); + } + } } } Modified: java-reactor/trunk/src/reactor/StreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/StreamHandler.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/StreamHandler.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -10,6 +10,6 @@ * @param channel * The channel. */ - public void handleConnect(StreamSession session); + public void handleConnect(StreamSession session) throws Exception; } Modified: java-reactor/trunk/src/reactor/StreamSession.java =================================================================== --- java-reactor/trunk/src/reactor/StreamSession.java 2008-10-08 22:33:44 UTC (rev 1010) +++ java-reactor/trunk/src/reactor/StreamSession.java 2008-10-09 06:23:31 UTC (rev 1011) @@ -53,7 +53,11 @@ assert this.key == key; boolean result = channel.finishConnect(); assert result; - handler.handleConnect(this); + try { + handler.handleConnect(this); + } catch (Exception ex) { + ex.printStackTrace(); + } key.interestOps(SelectionKey.OP_READ); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 22:33:53
|
Revision: 1010 http://assorted.svn.sourceforge.net/assorted/?rev=1010&view=rev Author: yangzhang Date: 2008-10-08 22:33:44 +0000 (Wed, 08 Oct 2008) Log Message: ----------- added a short description Added Paths: ----------- netio-bench/trunk/README Added: netio-bench/trunk/README =================================================================== --- netio-bench/trunk/README (rev 0) +++ netio-bench/trunk/README 2008-10-08 22:33:44 UTC (rev 1010) @@ -0,0 +1,6 @@ +Overview +-------- + +This is a simple echo server. Once a connection is accepted, it waits for 40 +bytes, echoes all the data it has received back to the client, then closes the +socket. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 22:07:26
|
Revision: 1009 http://assorted.svn.sourceforge.net/assorted/?rev=1009&view=rev Author: yangzhang Date: 2008-10-08 22:07:21 +0000 (Wed, 08 Oct 2008) Log Message: ----------- made the benchmark into an echo client/server Modified Paths: -------------- netio-bench/trunk/src/Makefile netio-bench/trunk/src/epoll.cc Modified: netio-bench/trunk/src/Makefile =================================================================== --- netio-bench/trunk/src/Makefile 2008-10-08 22:05:15 UTC (rev 1008) +++ netio-bench/trunk/src/Makefile 2008-10-08 22:07:21 UTC (rev 1009) @@ -1,9 +1,14 @@ -all: epoll +all: run +run: + rm -f epoll + g++ -I../../../cpp-commons/trunk/src/ -Wall -O3 -o epoll epoll.cc + ./epoll + epoll: epoll.cc g++ -I../../../cpp-commons/trunk/src/ -Wall -O3 -o $@ $< clean: rm -f epoll -.PHONY: clean +.PHONY: clean run Modified: netio-bench/trunk/src/epoll.cc =================================================================== --- netio-bench/trunk/src/epoll.cc 2008-10-08 22:05:15 UTC (rev 1008) +++ netio-bench/trunk/src/epoll.cc 2008-10-08 22:07:21 UTC (rev 1009) @@ -7,97 +7,165 @@ #include <commons/check.h> #include <commons/closing.h> +#include <commons/pool.h> #include <commons/sockets.h> +#include <iostream> using namespace commons; using namespace std; -/** - * Read data from the given file descriptor until we would block (EAGAIN) or we - * hit EOF/an error. - * \return true if we hit EAGAIN, false on EOF or unexpected error. - */ -static bool -consume(int fd) { - while (true) { - char buf[1024]; - int bytes = read(fd, buf, sizeof buf); - if (bytes == -1) { - if (errno == EAGAIN) { - return true; - } else { - perror("read"); - return false; +class echoer { + public: + /** + * \return true iff we are not done with the reading/would've blocked + * (EAGAIN), false iff we've gotten the full 40-byte packet or have hit + * EOF/an error. + */ + bool consume() { + while (true) { + char buf[1024]; + int bytes = ::read(fd_, buf, sizeof buf); + if (bytes == -1) { + // We're going to block. + if (errno == EAGAIN) { + return true; + } else { + perror("read"); + return false; + } + } + if (bytes == 0) { + return false; + } + ss_ << string(buf, bytes); + if (ss_.tellp() >= 10) + return false; } } - if (bytes == 0) { - return false; - } - // Write the data to stdout - checknneg(write(1, buf, bytes) == -1); - } -} + /** + * Read the contents of the buffer as a string. + */ + string read() { return ss_.str(); } + /** + * The socket file descriptor we're currently associated with. + */ + int & fd() { return fd_; } + int fd() const { return fd_; } + + private: + stringstream ss_; + int fd_; +}; + int main(int argc, char* argv[]) { + // Create a non-blocking server socket. int server = tcp_listen(8080, true); // Make sure the fd is finally closed. closingfd closer(server); - // Create our epoll file descriptor - const int max_events = 16; + // Create our epoll file descriptor. max_events is the maximum number of + // events to process at a time (max number of events that we want a call to + // epoll_wait() to "return"), while max_echoers is the max number of + // connections to make. + const int max_events = 16, max_echoers = 100; + + // This file descriptor isn't actually bound to any socket; it's a special fd + // that is really just used for manipulating the epoll (e.g., registering + // more sockets/connections with it). TODO: Figure out the rationale behind + // why this thing is an fd. int epoll_fd = checknneg(epoll_create(max_events)); - // Add our server fd to the epoll event loop + // Add our server fd to the epoll event loop. The event specifies: + // + // - what fd is + // - what operations we're interested in (connections, hangups, errors) + // (TODO: what are hangups?) + // - arbitrary data to be associated with this fd, in the form of a pointer + // (ptr) or number (u32/u64); this is more useful for connection fd's, of + // which there are multiple, and so it helps to have a direct pointer to + // (say) that connection's handler. + // + // The add operation actually makes a copy of the given epoll_event, so + // that's why we can reuse this `event` later. struct epoll_event event; event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; event.data.fd = server; checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server, &event)); - // Execute the epoll event loop + // Set up a bunch of echo server instances. + pool<echoer> echoers(max_echoers); + + // Execute the epoll event loop. while (true) { struct epoll_event events[max_events]; int num_fds = epoll_wait(epoll_fd, events, max_events, -1); for (int i = 0; i < num_fds; i++) { - // Case 1: Error condition + // Case 1: Error condition. if (events[i].events & (EPOLLHUP | EPOLLERR)) { fputs("epoll: EPOLLERR", stderr); + // epoll will remove the fd from its set automatically when the fd is + // closed. close(events[i].data.fd); - continue; - } - check(events[i].events & EPOLLIN); + } else { + check(events[i].events & EPOLLIN); - // Case 2: Our server is receiving a connection - if (events[i].data.fd == server) { - struct sockaddr remote_addr; - socklen_t addr_size = sizeof(remote_addr); - int connection = accept(server, &remote_addr, &addr_size); - if (connection == -1) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - perror("accept"); + // Case 2: Our server is receiving a connection. + if (events[i].data.fd == server) { + struct sockaddr remote_addr; + socklen_t addr_size = sizeof remote_addr; + int connection = accept(server, &remote_addr, &addr_size); + if (connection == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + perror("accept"); + } + continue; } - continue; + + // Make the connection non-blocking. + checknneg(fcntl(connection, F_SETFL, + O_NONBLOCK | fcntl(connection, F_GETFL, 0))); + + // Add the connection to our epoll loop. Note we are reusing our + // epoll_event. Now we're actually using the ptr field to point to a + // free handler. event.data is a union of {ptr, fd, ...}, so we can + // only use one of these. event.data is entirely for the user; epoll + // doesn't actually look at this. Note that we're passing the fd + // (connection) separately into epoll_ctl(). + echoer *e = echoers.take(); + cout << "got a connection! " << + echoers.size() << " echoers remaining" << endl; + event.data.ptr = e; + e->fd() = connection; + checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connection, + &event)); } - // Make the connection non-blocking - checknneg(fcntl(connection, F_SETFL, - O_NONBLOCK | fcntl(connection, F_GETFL, 0))); + // Case 3: One of our connections has read data. + else { + echoer &e = *((echoer*) events[i].data.ptr); + // If we have read the minimum amount (or encountered a dead-end + // situation), then echo the data back. + if (!e.consume()) { + // Write back! + string s = e.read(); + check((size_t) checknneg(write(e.fd(), s.c_str(), s.size())) == s.size()); - // Add the connection to our epoll loop - event.data.fd = connection; - checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connection, - &event)); - continue; - } + // epoll will remove the fd from its set automatically when the fd is + // closed. + close(e.fd()); - // Case 3: One of our connections has read data - if (!consume(events[i].data.fd)) { - // epoll will remove the fd from its set - // automatically when the fd is closed - close(events[i].data.fd); + // Release the echoer. + echoers.drop(&e); + + cout << "responded with '" << e.read() << "'; " << + echoers.size() << " echoers remaining" << endl; + } + } } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 22:05:17
|
Revision: 1008 http://assorted.svn.sourceforge.net/assorted/?rev=1008&view=rev Author: yangzhang Date: 2008-10-08 22:05:15 +0000 (Wed, 08 Oct 2008) Log Message: ----------- added: pool, array Added Paths: ----------- cpp-commons/trunk/src/commons/array.h cpp-commons/trunk/src/commons/pool.h Added: cpp-commons/trunk/src/commons/array.h =================================================================== --- cpp-commons/trunk/src/commons/array.h (rev 0) +++ cpp-commons/trunk/src/commons/array.h 2008-10-08 22:05:15 UTC (rev 1008) @@ -0,0 +1,33 @@ +#ifndef COMMONS_ARRAY_H +#define COMMONS_ARRAY_H + +#include <boost/scoped_array.hpp> + +namespace commons { + + using namespace boost; + + /** + * A thin wrapper around arrays. Like a fixed-size vector. Unlike array + * since the size can be dynamically determined. + */ + template<typename T> + class array : public scoped_array<T> { + public: + explicit array(size_t n) : scoped_array<T>(new T[n]), n_(n) {} + size_t size() { return n_; } + private: + size_t n_; + }; + + //template<typename T> + // class array { + // public: + // explicit array(int n) : a(new T[n]) {} + // private: + // scoped_array<T> a; + // }; + +} + +#endif Added: cpp-commons/trunk/src/commons/pool.h =================================================================== --- cpp-commons/trunk/src/commons/pool.h (rev 0) +++ cpp-commons/trunk/src/commons/pool.h 2008-10-08 22:05:15 UTC (rev 1008) @@ -0,0 +1,63 @@ +#ifndef COMMONS_POOL_H +#define COMMONS_POOL_H + +#include <boost/circular_buffer.hpp> +#include <commons/array.h> + +namespace commons { + + using namespace boost; + + /** + * A fixed-size pool of resources. + */ + template<typename T> + class pool { + public: + /** + * Create a pool of size \p size. These resources are created and stored + * in an array. + * \param[in] size The number of elements to create. + */ + pool(int size) : xs(size), ps(size) { + for (size_t i = 0; i < xs.size(); i++) + ps.push_back(&xs[i]); + } + + /** + * Take an item from the pool. + * \throw exception The pool is empty. + */ + T *take() { + check(!ps.empty(), "taking from empty pool"); + T *p = ps[0]; + ps.pop_front(); + return p; + } + + /** + * Release an item back into the pool. This doesn't check that the pointer + * being released was one that was originally taken from this pool. + * \param[in] The pointer to release. + * \throw exception The pool is full. + */ + void drop(T *p) { + check(ps.size() < ps.max_size(), "dropping into full pool"); + ps.push_back(p); + } + + /** + * Get the number of (remaining) items in the pool. + */ + size_t size() { + return ps.size(); + } + + private: + array<T> xs; + circular_buffer<T*> ps; + }; + +} + +#endif This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 19:51:20
|
Revision: 1007 http://assorted.svn.sourceforge.net/assorted/?rev=1007&view=rev Author: yangzhang Date: 2008-10-08 19:51:10 +0000 (Wed, 08 Oct 2008) Log Message: ----------- demo of how to inherit template classes Added Paths: ----------- sandbox/trunk/src/cc/template_inheritance.cc Added: sandbox/trunk/src/cc/template_inheritance.cc =================================================================== --- sandbox/trunk/src/cc/template_inheritance.cc (rev 0) +++ sandbox/trunk/src/cc/template_inheritance.cc 2008-10-08 19:51:10 UTC (rev 1007) @@ -0,0 +1,3 @@ +template <typename T> class A {}; +template <typename T> class B : public A<T> {}; +int main() { return 0; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 19:45:32
|
Revision: 1006 http://assorted.svn.sourceforge.net/assorted/?rev=1006&view=rev Author: yangzhang Date: 2008-10-08 19:45:23 +0000 (Wed, 08 Oct 2008) Log Message: ----------- added demo of using scoped arrays Added Paths: ----------- sandbox/trunk/src/cc/smartptrs.cc Added: sandbox/trunk/src/cc/smartptrs.cc =================================================================== --- sandbox/trunk/src/cc/smartptrs.cc (rev 0) +++ sandbox/trunk/src/cc/smartptrs.cc 2008-10-08 19:45:23 UTC (rev 1006) @@ -0,0 +1,6 @@ +#include <boost/scoped_array.hpp> +using namespace boost; +int main() { + scoped_array<int> a(new int[5]); + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 19:32:22
|
Revision: 1005 http://assorted.svn.sourceforge.net/assorted/?rev=1005&view=rev Author: yangzhang Date: 2008-10-08 19:32:11 +0000 (Wed, 08 Oct 2008) Log Message: ----------- demo of template syntax Added Paths: ----------- sandbox/trunk/src/cc/template.cc Added: sandbox/trunk/src/cc/template.cc =================================================================== --- sandbox/trunk/src/cc/template.cc (rev 0) +++ sandbox/trunk/src/cc/template.cc 2008-10-08 19:32:11 UTC (rev 1005) @@ -0,0 +1,10 @@ +template<typename T> +class C { + public: + int f() { return 0; } +}; +int main() { + C<int> c; + c.f(); + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 11:22:22
|
Revision: 1004 http://assorted.svn.sourceforge.net/assorted/?rev=1004&view=rev Author: yangzhang Date: 2008-10-08 08:41:08 +0000 (Wed, 08 Oct 2008) Log Message: ----------- completely refactored reactor to distinguish among the various session and handler types for the various types of channels/sockets (tcp listener, tcp connection, udp); got the tcp test working again, but the udp tests are still borked. Modified Paths: -------------- java-reactor/trunk/src/reactor/Reactor.java java-reactor/trunk/src/reactor/ReactorTest.java Added Paths: ----------- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java java-reactor/trunk/src/reactor/AbstractStreamHandler.java java-reactor/trunk/src/reactor/DatagramHandler.java java-reactor/trunk/src/reactor/DatagramSession.java java-reactor/trunk/src/reactor/ListenerHandler.java java-reactor/trunk/src/reactor/ListenerSession.java java-reactor/trunk/src/reactor/SocketHandler.java java-reactor/trunk/src/reactor/SocketSession.java java-reactor/trunk/src/reactor/StreamHandler.java java-reactor/trunk/src/reactor/StreamSession.java Removed Paths: ------------- java-reactor/trunk/src/reactor/AbstractHandler.java java-reactor/trunk/src/reactor/ReactorHandler.java java-reactor/trunk/src/reactor/Session.java java-reactor/trunk/src/reactor/SessionType.java Added: java-reactor/trunk/src/reactor/AbstractDatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractDatagramHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/AbstractDatagramHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,27 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +/** + * Provides default (no-op) handler methods for all events. + * + * @author yang + * + */ +public class AbstractDatagramHandler implements DatagramHandler { + + @Override + public void handleReceive(DatagramSession session, InetSocketAddress src, + ByteBuffer buf) { + } + + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + } + + @Override + public void handleWrite(SocketSession session) { + } + +} Deleted: java-reactor/trunk/src/reactor/AbstractHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/AbstractHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,29 +0,0 @@ -package reactor; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -/** - * Provides default (no-op) handler methods for all events. - * - * @author yang - * - */ -public class AbstractHandler implements ReactorHandler { - - @Override - public void handleAccept(Session listenerSession, SocketChannel src, - Session clientSession) { - } - - @Override - public void handleRead(Session session, InetSocketAddress src, - ByteBuffer buf) { - } - - @Override - public void handleConnect(Session session) { - } - -} Added: java-reactor/trunk/src/reactor/AbstractStreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/AbstractStreamHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/AbstractStreamHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,25 @@ +package reactor; + +import java.nio.ByteBuffer; + +/** + * Provides default (no-op) handler methods for all events. + * + * @author yang + * + */ +public class AbstractStreamHandler implements StreamHandler { + + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + } + + @Override + public void handleWrite(SocketSession session) { + } + + @Override + public void handleConnect(StreamSession session) { + } + +} Added: java-reactor/trunk/src/reactor/DatagramHandler.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/DatagramHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,29 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +/** + * Handler for events pertaining to a socket (session). We separate sessions + * from handlers because we may choose to have multiple sessions all pass their + * events to the same handler. + * + * @author yang + * + */ +public interface DatagramHandler extends SocketHandler { + + /** + * Handle a promiscuously received packet. + * + * @param session + * The session (socket) at which the packet was received. + * @param src + * The sender's socket address. + * @param buf + * The received packet. + */ + public void handleReceive(DatagramSession session, InetSocketAddress src, + ByteBuffer buf); + +} Added: java-reactor/trunk/src/reactor/DatagramSession.java =================================================================== --- java-reactor/trunk/src/reactor/DatagramSession.java (rev 0) +++ java-reactor/trunk/src/reactor/DatagramSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,89 @@ +package reactor; + +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; + +/** + * Represents a "session," which is probably a bad name for what is basically a + * wrapper around a TCP or UDP socket. Encapsulates some extra state information + * associated with the socket. + * + * @author yang + * + */ +public class DatagramSession extends SocketSession { + + private final DatagramChannel channel; + private final DatagramHandler handler; + private final SelectionKey key; + + @Override + protected ByteChannel getChannel() { + return channel; + } + + @Override + protected SocketHandler getHandler() { + return handler; + } + + @Override + protected SelectionKey getKey() { + return key; + } + + /** + * Construct a new Session object. + * + * @param selector + * The selector that is used in the current reactor. + * @param remoteSa + * If non-null, create a restricted datagram socket, which has + * better performance but occupies a file descriptor. + * @param localSa + * The local address and port to send/receive messages on, or + * null for auto port selection. + * @param handler + * The handler for events on this socket. + */ + DatagramSession(Selector selector, InetSocketAddress remoteSa, + InetSocketAddress localSa, DatagramHandler handler) + throws Exception { + DatagramChannel ch = DatagramChannel.open(); + ch.configureBlocking(false); + DatagramSocket socket = ch.socket(); + socket.setReuseAddress(true); + if (localSa != null) + socket.bind(localSa); + if (remoteSa != null) + ch.connect(remoteSa); + + key = ch.register(selector, SelectionKey.OP_READ, this); + channel = ch; + this.handler = handler; + } + + /** + * Send messages from this socket, in promiscuous mode. This does not + * enqueue anything onto the internal pendingWrites buffer if the socket's + * buffer is full, but instead triggers an assertion failure. + * + * @param writeBuf + * The packet to be sent. + * @param dst + * The destination socket address. + * @throws Exception + */ + public void send(ByteBuffer writeBuf, InetSocketAddress dst) + throws Exception { + int bytes = ((DatagramChannel) channel).send(writeBuf, dst); + // TODO: don't trigger an assertion failure! + assert bytes == writeBuf.limit(); + } + +} Added: java-reactor/trunk/src/reactor/ListenerHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ListenerHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/ListenerHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,24 @@ +package reactor; + +import java.nio.channels.SocketChannel; + +public interface ListenerHandler { + + /** + * Handle a newly accepted TCP connection. + * + * @param clientChannel + * The client channel. + * + * @return The StreamHandler to handle events of the new connection. + */ + public StreamHandler handleAccept(SocketChannel clientChannel); + + /** + * Handle exceptions that occurred while accepting. + * + * @param ex The exception. + */ + public void handleException(Exception ex); + +} Added: java-reactor/trunk/src/reactor/ListenerSession.java =================================================================== --- java-reactor/trunk/src/reactor/ListenerSession.java (rev 0) +++ java-reactor/trunk/src/reactor/ListenerSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,58 @@ +package reactor; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class ListenerSession { + + private final ServerSocketChannel channel; + private final ListenerHandler handler; + private final Selector selector; + + ListenerSession(Selector selector, InetSocketAddress localSa, + ListenerHandler handler) throws Exception { + ServerSocketChannel ch = ServerSocketChannel.open(); + ch.configureBlocking(false); + ServerSocket socket = ch.socket(); + socket.setReuseAddress(true); + socket.bind(localSa); + ch.register(selector, SelectionKey.OP_ACCEPT, this); + channel = ch; + this.handler = handler; + this.selector = selector; + } + + /** + * Handle a newly accepted TCP connection. + * + * @param key + * The selection key. + */ + public void handleAccept(SelectionKey key) { + while (true) { + try { + SocketChannel c = channel.accept(); + if (c == null) + break; + c.configureBlocking(false); + new StreamSession(selector, c, handler.handleAccept(c)); + } catch (Exception ex) { + handler.handleException(ex); + } + } + } + + /** + * Close the underlying socket. + * + * @throws Exception + */ + public void close() throws Exception { + channel.close(); + } + +} Modified: java-reactor/trunk/src/reactor/Reactor.java =================================================================== --- java-reactor/trunk/src/reactor/Reactor.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/Reactor.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -26,7 +26,15 @@ } /** - * Register a new session (i.e. socket). + * Register a new TCP server session. + */ + public ListenerSession registerStreamServer(InetSocketAddress localSa, + ListenerHandler handler) { + return null; + } + + /** + * Register a new datagram session (i.e. socket). * * @param remoteSa * If non-null, create a restricted datagram socket, which has @@ -38,14 +46,42 @@ * The handler for events on this socket. * @return */ - public Session register(SessionType type, InetSocketAddress remoteSa, - InetSocketAddress localSa, ReactorHandler handler) { - Session session = new Session(type, remoteSa, localSa, handler, - selector); - return session; + public DatagramSession createDatagramSession(InetSocketAddress remoteSa, + InetSocketAddress localSa, DatagramHandler handler) + throws Exception { + return new DatagramSession(selector, remoteSa, localSa, handler); } /** + * Create a stream session that is connecting to the given address. + * + * @param remoteSa + * The remote socket address to connect to. + * @param handler + * The event handler for this connection. + * @throws Exception + */ + public void connect(InetSocketAddress remoteSa, StreamHandler handler) + throws Exception { + new StreamSession(selector, remoteSa, handler); + } + + /** + * Create a stream session that listens on the specified local address for + * connections to accept. + * + * @param localSa + * The local socket address to bind to. + * @param handler + * The event handler for this listener. + * @throws Exception + */ + public void listen(InetSocketAddress localSa, ListenerHandler handler) + throws Exception { + new ListenerSession(selector, localSa, handler); + } + + /** * The main reactor loop. This runs until the shutdown method is called. * * @throws Exception @@ -68,13 +104,15 @@ for (SelectionKey key : keys) { if (key.isValid()) { if (key.isReadable()) { - ((Session) key.attachment()).handleRead(key); + ((SocketSession) key.attachment()).handleRead(key); } else if (key.isWritable()) { - ((Session) key.attachment()).handleWrite(key); + ((SocketSession) key.attachment()).handleWrite(key); } else if (key.isConnectable()) { - ((Session) key.attachment()).handleConnect(key); + ((StreamSession) key.attachment()) + .handleConnect(key); } else if (key.isAcceptable()) { - ((Session) key.attachment()).handleRead(key); + ((ListenerSession) key.attachment()) + .handleAccept(key); } } } Deleted: java-reactor/trunk/src/reactor/ReactorHandler.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/ReactorHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,51 +0,0 @@ -package reactor; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -/** - * Handler for events pertaining to a socket (session). - * - * @author yang - * - */ -public interface ReactorHandler { - - /** - * Handle a received packet. - * - * @param session - * The session (socket) at which the packet was received. - * @param src - * The sender's socket address. - * @param buf - * The received packet. - */ - public void handleRead(Session session, InetSocketAddress src, - ByteBuffer buf); - - /** - * Handle a newly accepted TCP connection. - * - * @param listenerSession - * The server session. - * @param channel - * The client channel. - * @param clientSession - * The client session. - */ - public void handleAccept(Session listenerSession, SocketChannel channel, - Session clientSession); - - /** - * Handle a successful outgoing TCP connection (we're the client). - * - * @param session - * The session for this socket. - * @param channel - * The channel. - */ - public void handleConnect(Session session); - -} Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -51,18 +51,20 @@ serverSa = new InetSocketAddress(localhost, serverPort); clientSa = new InetSocketAddress(localhost, clientPort); - final ReactorHandler handler = new AbstractHandler() { - public void handleRead(Session service, InetSocketAddress src, - ByteBuffer buf) { + final DatagramHandler handler = new AbstractDatagramHandler() { + @Override + public void handleReceive(DatagramSession service, + InetSocketAddress src, ByteBuffer buf) { System.out.println("received: " + buf); } }; + // Start the server. spawn(new Runnable() { public void run() { try { Reactor r = new Reactor(); - r.register(SessionType.DATAGRAM, null, serverSa, handler); + r.createDatagramSession(null, serverSa, handler); for (int i = 0; i < 10; i++) r.schedule(makeRunnable(i), 200 * i, @@ -76,22 +78,23 @@ } }); - spawn(new Runnable() { - public void run() { - try { - Reactor r = new Reactor(); - ByteBuffer writeBuf = ByteBuffer.allocate(5); - Session s = r.register(SessionType.DATAGRAM, null, - clientSa, handler); - Thread.sleep(2000); - s.send(writeBuf, clientSa); - r.react(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - }); +// // Start the client. +// spawn(new Runnable() { +// public void run() { +// try { +// Reactor r = new Reactor(); +// ByteBuffer writeBuf = ByteBuffer.allocate(5); +// DatagramSession s = r.createDatagramSession(null, clientSa, handler); +// Thread.sleep(2000); +// s.send(writeBuf, clientSa); +// r.react(); +// } catch (Exception ex) { +// ex.printStackTrace(); +// } +// } +// }); + // Start a second client. spawn(new Runnable() { public void run() { try { @@ -112,64 +115,74 @@ final InetSocketAddress serverSa; serverSa = new InetSocketAddress(localhost, serverPort); final Reactor r = new Reactor(); - r.register(SessionType.STREAM, null, serverSa, new AbstractHandler() { + r.listen(serverSa, new ListenerHandler() { - @Override - public void handleAccept(Session listenerSession, - SocketChannel src, Session clientSession) { - System.out.println("accepted"); + public StreamHandler handleAccept(SocketChannel clientChannel) { + return new AbstractStreamHandler() { + + @Override + public void handleRead(SocketSession session, ByteBuffer buf) { + try { + // Short-hand for: + // Charset charset = Charset.forName("us-ascii"); + // CharsetDecoder decoder = charset.newDecoder(); + // CharBuffer charBuffer = decoder.decode(buf); + // System.out.println("read: " + + // charBuffer.toString()); + System.out.println("echoing: '" + + Charset.forName("us-ascii").decode(buf) + + "'"); + session.write(buf); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + }; } - @Override - public void handleRead(Session session, InetSocketAddress src, - ByteBuffer buf) { - try { - // Short-hand for: - // Charset charset = Charset.forName("us-ascii"); - // CharsetDecoder decoder = charset.newDecoder(); - // CharBuffer charBuffer = decoder.decode(buf); - // System.out.println("read: " + charBuffer.toString()); - System.out.println("echoing: '" - + Charset.forName("us-ascii").decode(buf) + "'"); - session.write(buf); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + public void handleException(Exception ex) { } }); + r.schedule(new Runnable() { public void run() { - r.register(SessionType.STREAM, serverSa, null, - new AbstractHandler() { - @Override - public void handleConnect(Session session) { - System.out.println("connected"); - try { - session.write(ByteBuffer.wrap("hello" - .getBytes())); - } catch (Exception ex) { - throw new RuntimeException(ex); - } + try { + r.connect(serverSa, new AbstractStreamHandler() { + + @Override + public void handleConnect(StreamSession session) { + System.out.println("connected"); + try { + session.write(ByteBuffer.wrap("hello" + .getBytes())); + } catch (Exception ex) { + throw new RuntimeException(ex); } + } - @Override - public void handleRead(Session session, - InetSocketAddress src, ByteBuffer buf) { - System.out.println("got back: '" - + Charset.forName("us-ascii").decode( - buf) + "'"); - r.shutdown(); - } - }); + @Override + public void handleRead(SocketSession session, + ByteBuffer buf) { + System.out.println("got back: '" + + Charset.forName("us-ascii").decode(buf) + + "'"); + r.shutdown(); + } + + }); + } catch (Exception ex) { + throw new RuntimeException(ex); + } } }, 1, TimeUnit.SECONDS); r.react(); } public static void main(String args[]) throws Exception { - // new ReactorTest().testUDP(); - new ReactorTest().testTCP(); + new ReactorTest().testUDP(); + // new ReactorTest().testTCP(); } } Deleted: java-reactor/trunk/src/reactor/Session.java =================================================================== --- java-reactor/trunk/src/reactor/Session.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/Session.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,309 +0,0 @@ -package reactor; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.List; - -/** - * Represents a "session," which is probably a bad name for what is basically a - * wrapper around a TCP or UDP socket. Encapsulates some extra state information - * associated with the socket. - * - * @author yang - * - */ -public class Session { - - private final SelectableChannel channel; - private final ReactorHandler handler; - private final InetSocketAddress remoteSa; - private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); - private final List<ByteBuffer> pendingWrites = new ArrayList<ByteBuffer>(); - private final SessionType type; - private final Selector selector; - - /** - * Construct a new Session object. - * - * @param remoteSa - * If non-null, create a restricted datagram socket, which has - * better performance but occupies a file descriptor. - * @param localSa - * The local address and port to send/receive messages on, or - * null for auto port selection. - * @param handler - * The handler for events on this socket. - * @param selector - * The selector that is used in the current reactor. - */ - Session(SessionType type, InetSocketAddress remoteSa, - InetSocketAddress localSa, ReactorHandler handler, Selector selector) { - this.handler = handler; - this.remoteSa = remoteSa; - this.type = type; - this.selector = selector; - - try { - switch (type) { - case STREAM: - if (localSa != null && remoteSa == null) { - ServerSocketChannel ch = ServerSocketChannel.open(); - ch.configureBlocking(false); - ServerSocket socket = ch.socket(); - socket.setReuseAddress(true); - socket.bind(localSa); - ch.register(selector, SelectionKey.OP_ACCEPT, this); - channel = ch; - } else if (localSa == null && remoteSa != null) { - SocketChannel ch = SocketChannel.open(); - ch.configureBlocking(false); - ch.connect(remoteSa); - ch.register(selector, SelectionKey.OP_CONNECT, this); - channel = ch; - } else { - throw new IllegalArgumentException( - "At least one of remoteSa or localSa must be non-null."); - } - break; - case DATAGRAM: { - DatagramChannel ch = DatagramChannel.open(); - ch.configureBlocking(false); - DatagramSocket socket = ch.socket(); - socket.setReuseAddress(true); - if (localSa != null) - socket.bind(localSa); - if (remoteSa != null) - ch.connect(remoteSa); - - ch.register(selector, SelectionKey.OP_READ, this); - - channel = ch; - break; - } - default: - throw new AssertionError("unhandled session type"); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - Session(ReactorHandler handler, SocketChannel channel, Selector selector) { - this.handler = handler; - this.remoteSa = null; - this.channel = channel; - this.type = SessionType.STREAM; - this.selector = selector; - } - - /** - * This is called by the reactor when there is a message to be received. - * Reading messages is the priority, so this is done before anything else. - * - * @param key - * The key into the selector's socket set representing our - * socket. - * @throws Exception - */ - void handleRead(SelectionKey key) throws Exception { - loop: while (true) { - try { - final InetSocketAddress srcSa; - - switch (type) { - case STREAM: { - if (channel instanceof ServerSocketChannel) { - ServerSocketChannel ch = (ServerSocketChannel) channel; - SocketChannel c = ch.accept(); - if (c == null) - break loop; - c.configureBlocking(false); - Session session = new Session(handler, c, selector); - c.register(selector, SelectionKey.OP_READ, session); - handler.handleAccept(this, c, session); - } else { - assert channel instanceof SocketChannel; - SocketChannel ch = (SocketChannel) channel; - srcSa = (InetSocketAddress) ch.socket() - .getRemoteSocketAddress(); - int numRead = ch.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down cleanly. Do - // the same from our end and cancel the channel. - key.channel().close(); - key.cancel(); - break loop; - } - if (numRead == 0) - break loop; - - // after channel wrote to buf, set lim = pos, then pos = - // 0 - readBuf.flip(); - // callback - handler.handleRead(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); - } - break; - } - case DATAGRAM: { - DatagramChannel ch = (DatagramChannel) channel; - if (remoteSa == null) { - srcSa = (InetSocketAddress) ch.receive(readBuf); - } else { - int numRead = ch.read(readBuf); - if (numRead == -1) { - // Remote entity shut the socket down - // cleanly. - // Do - // the same from our end and cancel the - // channel. - key.channel().close(); - key.cancel(); - break loop; - } - // TODO also handle numRead == 0 - srcSa = remoteSa; - } - - if (srcSa == null) { - break loop; - } - - // after channel wrote to buf, set lim = pos, then pos = 0 - readBuf.flip(); - // callback - handler.handleRead(this, srcSa, readBuf); - // recycle buffer - readBuf.clear(); - break; - } - default: - throw new AssertionError("unhandled session type"); - } - } catch (IOException e) { - // The remote forcibly closed the connection, cancel - // the selection key and close the channel. - key.cancel(); - channel.close(); - } - } - } - - /** - * Handle the event where the socket has just established an outgoing - * connection. - */ - void handleConnect(SelectionKey key) throws IOException { - boolean result = ((SocketChannel) channel).finishConnect(); - assert result; - handler.handleConnect(this); - key.interestOps(SelectionKey.OP_READ); - } - - /** - * Handle the event where the socket is ready to be written. This is - * currently not used since the send() method blindly writes immediately to - * the socket - which is bad. - * - * @param key - * The key into the selector's socket set that represents our - * socket. - * @throws Exception - */ - void handleWrite(SelectionKey key) throws Exception { - // Write until there's not more data ... - while (!pendingWrites.isEmpty()) { - ByteBuffer buf = (ByteBuffer) pendingWrites.get(0); - int initPos = buf.position(); - int bytes = ((WritableByteChannel) channel).write(buf); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; - if (buf.remaining() > 0) { - // ... or the socket's buffer fills up - break; - } - pendingWrites.remove(0); - } - - if (pendingWrites.isEmpty()) { - // We wrote away all data, so we're no longer - // interested - // in writing on this socket. Switch back to waiting - // for - // data (remove OP_WRITE). - key.interestOps(SelectionKey.OP_READ); - } - } - - // TODO: use static (fixed) writeBuf (so that it can't change from under our - // feet)? - /** - * Write data to this stream socket. This does not enqueue anything onto the - * internal pendingWrites buffer if the socket's buffer is full, but instead - * triggers an assertion failure. - * - * @param buf - * The data to be written. - */ - public void write(ByteBuffer buf) throws Exception { - buf.rewind(); - int initPos = buf.position(); - int bytes = ((WritableByteChannel) channel).write(buf); - System.out.println("writing " + bytes); - assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; - if (buf.remaining() > 0) { - pendingWrites.add(buf); - // We're now interested in when the socket is ready for writing. - this.channel.keyFor(selector).interestOps( - SelectionKey.OP_READ | SelectionKey.OP_WRITE); - } - } - - /** - * Send messages from this socket. This does not enqueue anything onto the - * internal pendingWrites buffer if the socket's buffer is full, but instead - * triggers an assertion failure. - * - * @param writeBuf - * The packet to be sent. - * @param dst - * The destination socket address. - * @throws Exception - */ - public void send(ByteBuffer writeBuf, InetSocketAddress dst) - throws Exception { - switch (type) { - case STREAM: - throw new Exception("cannot send() on a TCP session!"); - case DATAGRAM: - int bytes = ((DatagramChannel) channel).send(writeBuf, dst); - // TODO: don't trigger an assertion failure! - assert bytes == writeBuf.limit(); - break; - default: - throw new AssertionError("unhandled session type"); - } - } - - /** - * Close the underlying socket. - * - * @throws Exception - */ - public void close() throws Exception { - channel.close(); - } - -} Deleted: java-reactor/trunk/src/reactor/SessionType.java =================================================================== --- java-reactor/trunk/src/reactor/SessionType.java 2008-10-08 03:34:52 UTC (rev 1003) +++ java-reactor/trunk/src/reactor/SessionType.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -1,7 +0,0 @@ -package reactor; - -public enum SessionType { - - DATAGRAM, STREAM - -} Added: java-reactor/trunk/src/reactor/SocketHandler.java =================================================================== --- java-reactor/trunk/src/reactor/SocketHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/SocketHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,25 @@ +package reactor; + +import java.nio.ByteBuffer; + +public interface SocketHandler { + + /** + * Handle a read. + * + * @param session + * The session (socket) at which the packet was received. + * @param buf + * The read data. + */ + public void handleRead(SocketSession session, ByteBuffer buf); + + /** + * Handle the event where the socket is writable. + * + * @param session + * The session that is ready for writing. + */ + public void handleWrite(SocketSession session); + +} Added: java-reactor/trunk/src/reactor/SocketSession.java =================================================================== --- java-reactor/trunk/src/reactor/SocketSession.java (rev 0) +++ java-reactor/trunk/src/reactor/SocketSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,124 @@ +package reactor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.util.ArrayDeque; +import java.util.Deque; + +public abstract class SocketSession { + + abstract protected SocketHandler getHandler(); + abstract protected ByteChannel getChannel(); + abstract protected SelectionKey getKey(); + + private final ByteBuffer readBuf = ByteBuffer.allocateDirect(4096); + private final Deque<ByteBuffer> pendingWrites = new ArrayDeque<ByteBuffer>(); + + /** + * This is called by the reactor when there is a message to be received. + * Reading messages is the priority, so this is done before anything else. + * + * @param key + * The key into the selector's socket set representing our + * socket. + * @throws Exception + */ + void handleRead(SelectionKey key) throws Exception { + while (true) { + try { + int numRead = getChannel().read(readBuf); + if (numRead == -1) { + // Remote entity shut the socket down cleanly. Do + // the same from our end and cancel the channel. + getChannel().close(); + key.cancel(); + break; + } + if (numRead == 0) + break; + + // after channel wrote to buf, set lim = pos, then pos = + // 0 + readBuf.flip(); + // callback + getHandler().handleRead(this, readBuf); + // recycle buffer + readBuf.clear(); + } catch (IOException e) { + // The remote forcibly closed the connection, cancel + // the selection key and close the channel. + key.cancel(); + getChannel().close(); + } + } + } + + /** + * Handle the event where the socket is ready to be written. This is + * currently not used since the send() method blindly writes immediately to + * the socket - which is bad. + * + * @param key + * The key into the selector's socket set that represents our + * socket. + * @throws Exception + */ + void handleWrite(SelectionKey key) throws Exception { + // Write until there's not more data ... + while (!pendingWrites.isEmpty()) { + ByteBuffer buf = (ByteBuffer) pendingWrites.getFirst(); + int initPos = buf.position(); + int bytes = getChannel().write(buf); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + if (buf.remaining() > 0) { + // ... or the socket's buffer fills up + break; + } + pendingWrites.removeFirst(); + } + + if (pendingWrites.isEmpty()) { + // We wrote away all data, so we're no longer + // interested + // in writing on this socket. Switch back to waiting + // for + // data (remove OP_WRITE). + key.interestOps(SelectionKey.OP_READ); + } + } + + // TODO: use static (fixed) writeBuf (so that it can't change from under our + // feet)? + /** + * Write data to this stream socket. This does not enqueue anything onto the + * internal pendingWrites buffer if the socket's buffer is full, but instead + * triggers an assertion failure. + * + * @param buf + * The data to be written. + */ + public void write(ByteBuffer buf) throws Exception { + buf.rewind(); + int initPos = buf.position(); + int bytes = getChannel().write(buf); + System.out.println("writing " + bytes); + assert buf.remaining() == buf.limit() - bytes && buf.position() == initPos + bytes; + if (buf.remaining() > 0) { + pendingWrites.add(buf); + // We're now interested in when the socket is ready for writing. + getKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + } + } + + /** + * Close the underlying socket. + * + * @throws Exception + */ + public void close() throws Exception { + getChannel().close(); + } + +} Added: java-reactor/trunk/src/reactor/StreamHandler.java =================================================================== --- java-reactor/trunk/src/reactor/StreamHandler.java (rev 0) +++ java-reactor/trunk/src/reactor/StreamHandler.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,15 @@ +package reactor; + +public interface StreamHandler extends SocketHandler { + + /** + * Handle a successful outgoing TCP connection (we're the client). + * + * @param session + * The session for this socket. + * @param channel + * The channel. + */ + public void handleConnect(StreamSession session); + +} Added: java-reactor/trunk/src/reactor/StreamSession.java =================================================================== --- java-reactor/trunk/src/reactor/StreamSession.java (rev 0) +++ java-reactor/trunk/src/reactor/StreamSession.java 2008-10-08 08:41:08 UTC (rev 1004) @@ -0,0 +1,60 @@ +package reactor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + +public class StreamSession extends SocketSession { + + private final SocketChannel channel; + private final StreamHandler handler; + private final SelectionKey key; + + @Override + protected ByteChannel getChannel() { + return channel; + } + + @Override + protected SocketHandler getHandler() { + return handler; + } + + @Override + protected SelectionKey getKey() { + return key; + } + + public StreamSession(Selector selector, InetSocketAddress remoteSa, + StreamHandler handler) throws Exception { + SocketChannel ch = SocketChannel.open(); + ch.configureBlocking(false); + ch.connect(remoteSa); + key = ch.register(selector, SelectionKey.OP_CONNECT, this); + this.handler = handler; + channel = ch; + } + + StreamSession(Selector selector, SocketChannel ch, StreamHandler handler) + throws Exception { + key = ch.register(selector, SelectionKey.OP_READ, this); + this.handler = handler; + channel = ch; + } + + /** + * Handle the event where the socket has just established an outgoing + * connection. + */ + void handleConnect(SelectionKey key) throws IOException { + assert this.key == key; + boolean result = channel.finishConnect(); + assert result; + handler.handleConnect(this); + key.interestOps(SelectionKey.OP_READ); + } + +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <yan...@us...> - 2008-10-08 03:34:56
|
Revision: 1003 http://assorted.svn.sourceforge.net/assorted/?rev=1003&view=rev Author: yangzhang Date: 2008-10-08 03:34:52 +0000 (Wed, 08 Oct 2008) Log Message: ----------- shutdown test Modified Paths: -------------- java-reactor/trunk/src/reactor/ReactorTest.java Modified: java-reactor/trunk/src/reactor/ReactorTest.java =================================================================== --- java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-07 22:35:53 UTC (rev 1002) +++ java-reactor/trunk/src/reactor/ReactorTest.java 2008-10-08 03:34:52 UTC (rev 1003) @@ -112,7 +112,7 @@ final InetSocketAddress serverSa; serverSa = new InetSocketAddress(localhost, serverPort); final Reactor r = new Reactor(); - ReactorHandler h = new AbstractHandler() { + r.register(SessionType.STREAM, null, serverSa, new AbstractHandler() { @Override public void handleAccept(Session listenerSession, @@ -137,8 +137,7 @@ } } - }; - r.register(SessionType.STREAM, null, serverSa, h); + }); r.schedule(new Runnable() { public void run() { r.register(SessionType.STREAM, serverSa, null, @@ -160,6 +159,7 @@ System.out.println("got back: '" + Charset.forName("us-ascii").decode( buf) + "'"); + r.shutdown(); } }); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |