From: <bro...@us...> - 2008-03-11 03:00:48
|
Revision: 146 http://gridsim.svn.sourceforge.net/gridsim/?rev=146&view=rev Author: brobergj Date: 2008-03-10 20:00:53 -0700 (Mon, 10 Mar 2008) Log Message: ----------- Porting FlowBuffer back to separate FlowInput/FlowOutput Added Paths: ----------- branches/gridsim4.0-branch2/source/gridsim/net/flow/FlowInput.java Added: branches/gridsim4.0-branch2/source/gridsim/net/flow/FlowInput.java =================================================================== --- branches/gridsim4.0-branch2/source/gridsim/net/flow/FlowInput.java (rev 0) +++ branches/gridsim4.0-branch2/source/gridsim/net/flow/FlowInput.java 2008-03-11 03:00:53 UTC (rev 146) @@ -0,0 +1,493 @@ +/* + * Title: GridSim Toolkit + * Description: GridSim (Grid Simulation) Toolkit for Modeling and Simulation + * of Parallel and Distributed Systems such as Clusters and Grids + * Licence: GPL - http://www.gnu.org/copyleft/gpl.html + * + * $Id: Input.java,v 1.7 2005/09/02 04:12:04 anthony Exp $ + */ + +package gridsim.net.flow; + +import gridsim.*; +import gridsim.net.*; +import gridsim.util.TrafficGenerator; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; + +import eduni.simjava.*; + +/** + * GridSim Input class defines a port through which a simulation entity + * receives data from the simulated network. + * <p> + * It maintains an event queue + * to serialize the data-in-flow and delivers to its parent entity. + * It accepts messages that comes from GridSim entities 'Output' entity + * and passes the same to the GridSim entity. + * It simulates Network communication delay depending on Baud rate + * and data length. Simultaneous inputs can be modeled using multiple + * instances of this class. + * + * @author Manzur Murshed and Rajkumar Buyya + * @since GridSim Toolkit 1.0 + * @invariant $none + */ +public class FlowInput extends Sim_entity implements NetIO +{ + private Sim_port inPort_; + private Link link_; + private double baudRate_; + private static final int BITS = 8; // 1 byte = 8 bits + private final int SIZE = 8; // 1 byte in bits + + + private HashMap<Integer, Packet> activeFlows_; // stores a list of active Flows + + + /** + * Allocates a new Input object + * @param name the name of this object + * @param baudRate the communication speed + * @throws NullPointerException This happens when creating this entity + * before initializing GridSim package or this entity name + * is <tt>null</tt> or empty + * @pre name != null + * @pre baudRate >= 0.0 + * @post $none + */ + public FlowInput(String name, double baudRate) throws NullPointerException + { + super(name); + this.baudRate_ = baudRate; + link_= null; + + inPort_ = new Sim_port("input_buffer"); + super.add_port(inPort_); + + activeFlows_ = null; + + } + + /** + * Sets the Input entities link. This should be used only if the network + * extensions are being used. + * @param link the link to which this Input entity should send data + * @pre link != null + * @post $none + */ + public void addLink(Link link) + { + this.link_ = link; + baudRate_ = link_.getBaudRate(); + activeFlows_ = new HashMap(); + + } + + /** + * Gets the baud rate + * @return the baud rate + * @deprecated As of GridSim 2.1, replaced by {@link #getBaudRate()} + * @pre $none + * @post $result >= 0.0 + */ + public double GetBaudRate() { + return this.getBaudRate(); + } + + /** + * Gets the baud rate + * @return the baud rate + * @pre $none + * @post $result >= 0.0 + */ + public double getBaudRate() { + return baudRate_; + } + + /** + * Gets the I/O real number based on a given value + * @param value the specified value + * @return real number + * @deprecated As of GridSim 2.1, replaced by {@link #realIO(double)} + * @pre value >= 0.0 + * @post $result >= 0.0 + */ + public double real_io(double value) { + return this.realIO(value); + } + + /** + * Gets the I/O real number based on a given value + * @param value the specified value + * @return real number + * @pre value >= 0.0 + * @post $result >= 0.0 + */ + public double realIO(double value) { + return GridSimRandom.realIO(value); + } + + /** + * This is an empty method and only applicable to + * {@link gridsim.net.Output} class. + * @param gen a background traffic generator + * @param userName a collection of user entity name (in String object). + * @return <tt>false</tt> since this method is not used by this class. + * @pre gen != null + * @pre userName != null + * @post $none + * @see gridsim.net.Output + */ + public boolean setBackgroundTraffic(TrafficGenerator gen, + Collection userName) + { + return false; + } + + /** + * This is an empty method and only applicable to + * {@link gridsim.net.Output} class. + * @param gen a background traffic generator + * @return <tt>false</tt> since this method is not used by this class. + * @pre gen != null + * @post $none + * @see gridsim.net.Output + */ + public boolean setBackgroundTraffic(TrafficGenerator gen) + { + return false; + } + + /** + * A method that gets one process event at one time until the end + * of a simulation, then delivers an event to the entity (its parent) + * @pre $none + * @post $none + */ + public synchronized void body() + { + // Process events + Object obj = null; + while ( Sim_system.running() ) + { + Sim_event ev = new Sim_event(); + super.sim_get_next(ev); // get the next event in the queue + obj = ev.get_data(); // get the incoming data + + System.out.println(super.get_name() + ".body(): ev.get_tag() is " + ev.get_tag()); + System.out.println(super.get_name() + ".body(): ev.get_src() is " + ev.get_src()); + + // if the simulation finishes then exit the loop + if (ev.get_tag() == GridSimTags.END_OF_SIMULATION) { + break; + // Check if initial flow duration estimation was accurate + } else if(ev.get_tag() == GridSimTags.FLOW_HOLD) { + System.out.println(super.get_name() + ".body(): checkForecast() + at time = " + GridSim.clock()); + checkForecast(ev); + break; + // Update flow duration forecast + } else if (ev.get_tag() == GridSimTags.FLOW_UPDATE) { + System.out.println(super.get_name() + ".body(): updateForecast() + at time = " + GridSim.clock()); + updateForecast(ev); + break; + //} else if (ev.get_tag() == GridSimTags.FLOW_ACK) { + // System.out.println(super.get_name() + ".body(): submitAckToLink() + at time = " + GridSim.clock()); + //submitAckToLink(ev); + // break; + } + + // if this entity is not connected in a network topology + if (obj != null && obj instanceof IO_data) { + System.out.println(super.get_name() + ".body(): getDataFromEvent() + at time = " + GridSim.clock()); + getDataFromEvent(ev); + } + + // if this entity belongs to a network topology + else if (obj != null && link_ != null) { + System.out.println(super.get_name() + ".body(): getDataFromLink() + at time = " + GridSim.clock()); + getDataFromLink(ev); + } + + ev = null; // reset to null for gc to collect + } + } + + /** + * Check the forecast of a flow, and send data to output port if flow still exists + * + * @param ev the flow hold notification event + * @pre ev != null + * @post $none + */ + private synchronized void checkForecast(Sim_event ev) { + int pktID = (Integer) ev.get_data(); // ID of flow to be checked + FlowPacket fp = null; // Reference to flow packet that needs forecast update + FlowPacket fpAck = null; + + System.out.println(super.get_name() + ".checkForecast(): checking pkt id # " + pktID); + + // If flow hasn't already finished, send it to inPort + if ((fp = (FlowPacket) activeFlows_.get(pktID)) != null) { + Object data = fp.getData(); + IO_data io = new IO_data( data, fp.getSize(), + inPort_.get_src()); + super.sim_schedule(inPort_, GridSimTags.SCHEDULE_NOW, fp.getTag() , io.getData()); + activeFlows_.remove(pktID); + + // Send ack to source of flow + System.out.println(super.get_name() + ".checkForecast(): flow came from " + GridSim.getEntityName(fp.getSrcID()) + + " heading to " + GridSim.getEntityName(fp.getDestID())); + int oldDestID = fp.getDestID(); + int oldSrcID = fp.getSrcID(); + + IO_data ackData = new IO_data(fp.getData(), fp.getSize(), oldSrcID-2); + fpAck = new FlowPacket(ackData,fp.getID(),fp.getSize(),GridSimTags.FLOW_RETURN,oldDestID+2, + oldSrcID-2, fp.getNetServiceType(), 1, 1); + fpAck.setSize(fp.getSize()); + fpAck.setRemSize(0); + fpAck.setStartTime(fp.getStartTime()); + fpAck.setLatency(fp.getLatency()); + + System.out.println("Sending flow packet ack to link at time = " + GridSim.clock() + " id is " + + fpAck.getID()); + super.sim_schedule(GridSim.getEntityId("Output_" + GridSim.getEntityName(oldDestID)), + GridSimTags.SCHEDULE_NOW, GridSimTags.FLOW_ACK, fpAck); + + //super.sim_schedule(super.get_id(), GridSimTags.SCHEDULE_NOW, GridSimTags.FLOW_ACK, fpAck); + + } else { + System.out.println(super.get_name() + ".checkForecast(): pkt id # " + pktID + " already removed"); + + } + + } + + /** + * Update the forecast of a flow, delete the old forecast and schedule a new flow hold + * event in the future with the corrected forecast + * + * @param ev the flow update notification event + * @pre ev != null + * @post $none + */ + private synchronized void updateForecast(Sim_event ev) { + int pktID = (Integer) ev.get_data(); // ID of flow to be updated + FlowPacket fp = null; // Reference to flow packet that needs forecast update + double duration = 0.0; // New forecast duration from current Gridsim.clock() + long remSizeOld = 0; // Previous remaining size + double bandwidthOld = 0.0; // Previous bottleneck BW + int sourceID = ev.get_src(); // ID of source of notification (FlowLink) + int cancelledFlow = 0; // Count of canceled future events that match old forecast + + System.out.println(super.get_name() + ".updateForecast(): updating pkt id # " + pktID); + + // If flow hasn't already finished and been cleared... + if ((fp = (FlowPacket) activeFlows_.get(pktID)) != null) { + remSizeOld = fp.getRemSize(); + bandwidthOld = fp.getBandwidth_(); + System.out.println(super.get_name() + "updateForecast(): rem size is " + remSizeOld + "BW old is " + bandwidthOld); + Iterator it = (fp.getLinks_()).iterator(); + + while (it.hasNext()) { + FlowLink fl = (FlowLink) it.next(); + if (fl.get_id() == sourceID) { + fp.setBandwidth_(fl.getBaudRate()); + fp.setBottleneckID(sourceID); + } + } + + fp.setRemSize((long)(remSizeOld*BITS - (GridSim.clock()-fp.getUpdateTime())*bandwidthOld)); + duration = fp.getRemSize()/fp.getBandwidth_(); + System.out.println(super.get_name() + " new remaining duration add " + duration); + + FilterFlow filter = new FilterFlow(fp.getID(), GridSimTags.FLOW_HOLD); + cancelledFlow = this.sim_cancel(filter, null); + + if (cancelledFlow != 0) { + System.out.println(super.get_name() + ".updateForecast(): old forecast cancelled #matches " + + cancelledFlow); + } + + + super.sim_schedule(super.get_id(), duration, GridSimTags.FLOW_HOLD , new Integer(fp.getID())); + } + } + + /** + * Process incoming event for data without using the network extension + * @param ev a Sim_event object + * @pre ev != null + * @post $none + */ + private void getDataFromEvent(Sim_event ev) + { + IO_data io = (IO_data) ev.get_data(); + + // if the sender is not part of the overall network topology + // whereas this entity is, then need to return back the data, + // since it is not compatible. + if (link_ != null) + { + // outName = "Output_xxx", where xxx = sender entity name + String outName = GridSim.getEntityName( ev.get_src() ); + + // NOTE: this is a HACK job. "Output_" has 7 chars. So, + // the idea is to get only the entity name by removing + // "Output_" word in the outName string. + String name = outName.substring(7); + + // if the sender is not system GIS then ignore the message + if (GridSim.getEntityId(name) != GridSim.getGridInfoServiceEntityId()) + { + // sends back the data to "Input_xxx", where + // xxx = sender entity name. If not sent, then the sender + // will wait forever to receive this data. As a result, + // the whole simulation program will be hanged or does not + // terminate successfully. + int id = GridSim.getEntityId("Input_" + name); + super.sim_schedule(id, 0.0, ev.get_tag(), io); + + // print an error message + System.out.println(super.get_name() + ".body(): Error - " + + "incompatible message protocol."); + System.out.println(" Sender: " + name + " is not part " + + "of this entity's network topology."); + System.out.println(" Hence, sending back the received data."); + System.out.println(); + return; + } + } + + // NOTE: need to have a try-catch statement. This is because, + // if the above if statement holds, then Input_receiver will send + // back to Input_sender without going through Output_receiver entity. + // Hence, a try-catch is needed to prevent exception of wrong casting. + try + { + // Simulate Transmission Time after Receiving + // Hold first then dispatch + double senderBaudRate = ( (FlowOutput) + Sim_system.get_entity(ev.get_src()) ).getBaudRate(); + + // NOTE: io is in byte and baud rate is in bits. 1 byte = 8 bits + // So, convert io into bits + double minBaudRate = Math.min(baudRate_, senderBaudRate); + double communicationDelay = GridSimRandom.realIO( + (io.getByteSize() * BITS) / minBaudRate); + + // NOTE: Below is a deprecated method for SimJava 2 + //super.sim_hold(communicationDelay); + super.sim_process(communicationDelay); // receiving time + } + catch (Exception e) { + // .... empty + } + + // Deliver Event to the entity (its parent) to which + // it is acting as buffer + super.sim_schedule( inPort_, GridSimTags.SCHEDULE_NOW, + ev.get_tag(), io.getData() ); + } + + private synchronized void submitAckToLink(Sim_event ev) + { + + FlowPacket fp = (FlowPacket)ev.get_data(); + System.out.println("Sending flow packet ack to link at time = " + GridSim.clock() + " id is " + + ((FlowPacket) ev.get_data()).getID()); + super.sim_schedule(this.link_.get_id(), GridSimTags.SCHEDULE_NOW, GridSimTags.PKT_FORWARD, fp); + } + + /** + * Process incoming events from senders that are using the network + * extension + * @param ev a Sim_event object + * @pre ev != null + * @post $none + */ + private void getDataFromLink(Sim_event ev) + { + Object obj = ev.get_data(); + double duration = 0.0; + + if (obj instanceof Packet) + { + // decrypt the packet into original format + Packet pkt = (Packet) ev.get_data(); + + if (pkt instanceof InfoPacket) + { + processPingRequest( (InfoPacket) pkt); + return; + } + + if (pkt instanceof FlowPacket) + { + FlowPacket np = (FlowPacket) pkt; + int tag = np.getTag(); + System.out.println("Packet id is " + np.getID()); + + // ignore incoming junk packets + if (tag == GridSimTags.JUNK_PKT) { + return; + } + + // ignore incoming null dummy packets + if (tag == GridSimTags.EMPTY_PKT && np.getData() == null) { + return; + } + + // convert the packets into IO_data + Object data = np.getData(); + IO_data io = new IO_data( data, np.getSize(), + inPort_.get_src()); + System.out.println(super.get_name() + ".getDataFromLink() Time now " + GridSim.clock() + + " bottleneck is " + np.getBandwidth_() + " sum lat is " + np.getLatency() ); + + // if flow terminates at next entity, add to active flows + // & hold for appropriate duration + if (pkt.getTag() == GridSimTags.FLOW_SUBMIT) { + np.setStartTime(GridSim.clock()); + np.setUpdateTime(GridSim.clock()); + duration = np.getSize()*SIZE / np.getBandwidth_(); + activeFlows_.put(pkt.getID(), pkt); + super.sim_schedule(super.get_id(), duration, GridSimTags.FLOW_HOLD, new Integer(pkt.getID())); + System.out.println(super.get_name() + ".getDataFromLink() forecast flow end at " + (GridSim.clock() + + duration)); + + // if flow is just an ACK of a finished flow do not hold + } else if (pkt.getTag() == GridSimTags.FLOW_RETURN){ + duration = 0.0; + // send the data into entity input/output port + super.sim_schedule(inPort_, duration, tag, + io.getData() ); + } + + + } + } + } + + /** + * Processes a ping request + * @param pkt a packet for pinging + * @pre pkt != null + * @post $none + */ + private void processPingRequest(InfoPacket pkt) + { + // add more information to ping() packet + pkt.addHop( inPort_.get_dest() ); + pkt.addEntryTime( GridSim.clock() ); + + IO_data io = new IO_data( pkt, pkt.getSize(), inPort_.get_dest() ); + + // send this ping() packet to the entity + super.sim_schedule(inPort_, GridSimTags.SCHEDULE_NOW, + pkt.getTag(), io.getData()); + } + +} // end class + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |