From: <nc...@us...> - 2011-08-13 08:27:03
|
Revision: 842 http://pytrainer.svn.sourceforge.net/pytrainer/?rev=842&view=rev Author: ncjones Date: 2011-08-13 08:26:55 +0000 (Sat, 13 Aug 2011) Log Message: ----------- Add sport service. ticket:138 Modified Paths: -------------- pytrainer/trunk/pytrainer/sport.py pytrainer/trunk/pytrainer/test/sport_test.py Modified: pytrainer/trunk/pytrainer/sport.py =================================================================== --- pytrainer/trunk/pytrainer/sport.py 2011-08-13 08:26:10 UTC (rev 841) +++ pytrainer/trunk/pytrainer/sport.py 2011-08-13 08:26:55 UTC (rev 842) @@ -16,7 +16,8 @@ #along with this program; if not, write to the Free Software #Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -from pytrainer.lib.color import Color +from pytrainer.lib.color import Color, color_from_hex_string +import logging class Sport(object): @@ -90,3 +91,130 @@ self._color = color color = property(_get_color, _set_color) + +class SportServiceException(Exception): + + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + +_TABLE = "sports" + +_ID_COLUMN = "id_sports" + +_NAME_COLUMN = "name" + +_UPDATE_COLUMNS = _NAME_COLUMN + ",weight,met,max_pace,color" + +_SELECT_COLUMNS = _ID_COLUMN + "," + _UPDATE_COLUMNS + +class SportService(object): + + """Provides access to stored sports.""" + + def __init__(self, ddbb): + self._ddbb = ddbb + + def _create_sport(self, row): + sport = Sport() + sport.id = row[0] + sport.name = unicode(row[1]) + sport.weight = row[2] + sport.met = row[3] + sport.max_pace = row[4] + sport.color = color_from_hex_string(row[5]) + return sport + + def _create_row(self, sport): + return [sport.name, + sport.weight, + sport.met, + sport.max_pace, + sport.color.to_hex_string()] + + def _create_id_where_clause(self, sport_id): + return _ID_COLUMN + "=" + str(sport_id) + + def _create_name_where_clause(self, sport_name): + return _NAME_COLUMN + "=\"{0}\"".format(sport_name) + + def get_sport(self, sport_id): + """Get the sport with the specified id. + + If no sport with the given id exists then None is returned.""" + resultSet = self._ddbb.select(_TABLE, _SELECT_COLUMNS, self._create_id_where_clause(sport_id)) + if len(resultSet) == 0: + return None + else: + return self._create_sport(resultSet[0]) + + def get_sport_by_name(self, name): + """Get the sport with the specified name. + + If no sport with the given name exists then None is returned.""" + sport_id = self._get_sport_id_from_name(name) + return self.get_sport(sport_id) + + def _get_sport_id_from_name(self, name): + result_set = self._ddbb.select(_TABLE, _ID_COLUMN, self._create_name_where_clause(name)) + if len(result_set) > 0: + return result_set[0][0] + return None + + def get_all_sports(self): + """Get all stored sports.""" + result_set = self._ddbb.select(_TABLE, _SELECT_COLUMNS) + logging.debug("Retrieved all sports ({0} results).".format(len(result_set))) + sports = [] + for row in result_set: + sport = self._create_sport(row) + sports.append(sport) + return sports + + def store_sport(self, sport): + """Store a new or update an existing sport. + + The stored object is returned.""" + if (sport.id is None): + sport_id = self._store_new_sport(sport) + else: + sport_id = self._update_existing_sport(sport) + return self.get_sport(sport_id) + + def _store_new_sport(self, sport): + self._assert_unique(sport) + self._ddbb.insert(_TABLE, _UPDATE_COLUMNS, self._create_row(sport)) + logging.debug("Stored new sport: '{0}'.".format(sport.name)) + return self._get_sport_id_from_name(sport.name) + + def _update_existing_sport(self, sport): + self._assert_exists(sport) + self._assert_unique(sport) + self._ddbb.update(_TABLE, _UPDATE_COLUMNS, self._create_row(sport), self._create_id_where_clause(sport.id)) + logging.debug("Updated sport: '{0}'.".format(sport.name)) + return sport.id + + def _assert_unique(self, sport): + id = self._get_sport_id_from_name(sport.name) + if id is not None and id != sport.id: + raise SportServiceException("A sport already exists with name '{0}'".format(sport.name)) + logging.debug("Asserted sport name is unique: '{0}'.".format(sport.name)) + + def _assert_exists(self, sport): + result_set = self._ddbb.select(_TABLE, _ID_COLUMN, self._create_id_where_clause(sport.id)) + if (result_set == []): + raise SportServiceException("Sport does not exist with id: '{0}'.".format(sport.id)) + logging.debug("Asserted sport exists with id: '{0}'.".format(sport.id)) + + def remove_sport(self, sport): + """Delete a stored sport. + + All records associated with the sport will also be deleted.""" + if (sport.id is None): + raise SportServiceException("Cannot remove sport which has not been stored: '{0}'.".format(sport.name)) + self._assert_exists(sport) + self._ddbb.delete("records", "sport=" + str(sport.id)) + self._ddbb.delete(_TABLE, self._create_id_where_clause(sport.id)) + logging.debug("Deleted sport: '{0}'.".format(sport.name)) Modified: pytrainer/trunk/pytrainer/test/sport_test.py =================================================================== --- pytrainer/trunk/pytrainer/test/sport_test.py 2011-08-13 08:26:10 UTC (rev 841) +++ pytrainer/trunk/pytrainer/test/sport_test.py 2011-08-13 08:26:55 UTC (rev 842) @@ -17,7 +17,10 @@ #Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import unittest -from pytrainer.sport import Sport +from pytrainer.sport import Sport, SportService, SportServiceException +import mock +from pytrainer.lib.sqliteUtils import Sql +import pytrainer class SportTest(unittest.TestCase): @@ -212,3 +215,164 @@ pass else: self.fail() + + +class SportServiceTest(unittest.TestCase): + + def setUp(self): + self.mock_ddbb = mock.Mock(spec=Sql) + self.sport_service = SportService(self.mock_ddbb) + + def test_store_sport_should_insert_row_when_sport_has_no_id(self): + self.mock_ddbb.select.return_value = [] + sport = Sport() + sport.name = u"Test name" + self.sport_service.store_sport(sport) + self.mock_ddbb.insert.assert_called_with("sports", "name,weight,met,max_pace,color", + [u"Test name", 0.0, None, None, "0000ff"]) + + def test_store_sport_should_update_row_when_sport_has_id(self): + def mock_select(table, columns, where): + if columns == "id_sports": + return [[1]] + else: + return [(1, u"", 0, 0, 0, "0")] + self.mock_ddbb.select = mock.Mock(wraps=mock_select) + sport = Sport() + sport.id = 1 + sport.name = u"New name" + self.sport_service.store_sport(sport) + self.mock_ddbb.update.assert_called_with("sports", "name,weight,met,max_pace,color", + [u"New name", 0.0, None, None, "0000ff"], "id_sports=1") + + def test_store_sport_should_return_stored_sport(self): + sport_ids = [] + def update_sport_ids(*args): + sport_ids.append([1]) + self.mock_ddbb.insert.side_effect = update_sport_ids + def mock_select(table, columns, where): + if columns == "id_sports": + return sport_ids + else: + return [(2, u"", 0, 0, 0, "0")] + self.mock_ddbb.select = mock.Mock(wraps=mock_select) + sport = Sport() + stored_sport = self.sport_service.store_sport(sport) + self.assertEquals(2, stored_sport.id) + + def test_store_sport_should_error_when_sport_has_unknown_id(self): + self.mock_ddbb.select.return_value = [] + sport = Sport() + sport.id = 100 + try: + self.sport_service.store_sport(sport) + except(SportServiceException): + pass + else: + self.fail() + + def test_store_sport_should_error_when_new_sport_has_duplicate_name(self): + self.mock_ddbb.select.return_value = [(1, u"Test name", 150, 12.5, 200, "0")] + sport = Sport() + sport.name = u"Test name" + try: + self.sport_service.store_sport(sport) + except(SportServiceException): + pass + else: + self.fail() + + def test_store_sport_should_error_when_existing_sport_has_duplicate_name(self): + def mock_select(table, columns, where): + if columns == pytrainer.sport._ID_COLUMN: + return [[2]] + else: + return [(1, u"Test name", 0, 0.0, "0"), (2, u"New name", 0, 0.0, "0")] + self.mock_ddbb.select = mock.Mock(wraps=mock_select) + sport = Sport() + sport.id = 1 + sport.name = u"New name" + try: + self.sport_service.store_sport(sport) + except(SportServiceException): + pass + else: + self.fail() + + def test_get_sport_returns_none_for_nonexistant_sport(self): + self.mock_ddbb.select.return_value = [] + sport = self.sport_service.get_sport(1) + self.assertEquals(None, sport) + + def test_get_sport_returns_sport_with_id(self): + self.mock_ddbb.select.return_value = [(1, u"", 0, 0, 0, "0")] + sport = self.sport_service.get_sport(1) + self.assertEquals(1, sport.id) + + def test_get_sport_by_name_returns_none_for_nonexistant_sport(self): + self.mock_ddbb.select.return_value = [] + sport = self.sport_service.get_sport("no such sport") + self.assertEquals(None, sport) + + def test_get_sport_by_name_returns_sport_with_name(self): + def mock_select(table, columns, where): + if columns == "id_sport": + return [(1)] + else: + return [(1, u"rugby", 0, 0, 0, "0")] + self.mock_ddbb.select = mock.Mock(wraps=mock_select) + sport = self.sport_service.get_sport("rugby") + self.assertEquals(u"rugby", sport.name) + + def test_get_all_sports_should_return_all_sports_in_query_result(self): + self.mock_ddbb.select.return_value = [(1, u"Test name", 0, 0, 0, "0"), (2, u"Test name 2", 0, 0, 0, "0")] + sports = self.sport_service.get_all_sports() + self.assertEquals(2, len(sports)) + sport1 = sports[0] + self.assertEquals(1, sport1.id) + sport2 = sports[1] + self.assertEquals(2, sport2.id) + + def test_get_all_sports_should_return_no_sports_when_query_result_empty(self): + self.mock_ddbb.select.return_value = [] + sports = self.sport_service.get_all_sports() + self.assertEquals(0, len(sports)) + + def test_remove_sport_should_error_when_sport_has_no_id(self): + self.mock_ddbb.select.return_value = [(1, u"Test name", 150, 12.5, 200, "0")] + sport = Sport() + try: + self.sport_service.remove_sport(sport) + except(SportServiceException): + pass + else: + self.fail() + + def test_remove_sport_should_error_when_sport_has_unknown_id(self): + self.mock_ddbb.select.return_value = [] + sport = Sport() + sport.id = 100 + try: + self.sport_service.remove_sport(sport) + except(SportServiceException): + pass + else: + self.fail() + + def test_remove_sport_should_delete_sport_with_specified_id(self): + self.mock_ddbb.select.return_value = [[1]] + sport = Sport() + sport.id = 1 + self.sport_service.remove_sport(sport) + self.mock_ddbb.delete.assert_called_with("sports", "id_sports=1") + + def test_remove_sport_should_remove_associated_entries(self): + self.mock_ddbb.select.return_value = [[1]] + sport = Sport() + sport.id = 1 + delete_arguments = [] + def mock_delete(*args): + delete_arguments.append(args) + self.mock_ddbb.delete = mock.Mock(wraps=mock_delete) + self.sport_service.remove_sport(sport) + self.assertEquals(("records", "sport=1"), delete_arguments[0]) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |