INET Framework for OMNeT++/OMNEST
|
#include <SCTPClient.h>
Classes | |
struct | pathStatus |
Public Types | |
typedef std::map< IPvXAddress, pathStatus > | SCTPPathStatus |
Public Member Functions | |
void | initialize () |
void | handleMessage (cMessage *msg) |
void | finish () |
void | handleTimer (cMessage *msg) |
void | setAssociation (SCTPAssociation *_assoc) |
void | setPrimaryPath (const char *addr) |
void | sendRequestArrived () |
void | sendQueueRequest () |
void | shutdownReceivedArrived (int32 connId) |
void | sendqueueFullArrived (int32 connId) |
void | sendqueueAbatedArrived (int32 connId, uint64 buffer) |
void | addressAddedArrived (int32 assocId, IPvXAddress remoteAddr) |
Utility functions | |
void | connect () |
void | close () |
void | setStatusString (const char *s) |
SCTPSocket::CallbackInterface callback methods | |
void | socketEstablished (int32 connId, void *yourPtr, uint64 buffer) |
void | socketDataArrived (int32 connId, void *yourPtr, cPacket *msg, bool urgent) |
void | socketDataNotificationArrived (int32 connId, void *yourPtr, cPacket *msg) |
void | socketPeerClosed (int32 connId, void *yourPtr) |
void | socketClosed (int32 connId, void *yourPtr) |
void | socketFailure (int32 connId, void *yourPtr, int32 code) |
void | socketStatusArrived (int32 connId, void *yourPtr, SCTPStatusInfo *status) |
Public Attributes | |
SCTPPathStatus | sctpPathStatus |
Protected Member Functions | |
void | sendRequest (bool last=true) |
Protected Attributes | |
SCTPSocket | socket |
SCTPAssociation * | assoc |
int32 | numSessions |
int32 | numBroken |
uint64 | packetsSent |
uint64 | packetsRcvd |
uint64 | bytesSent |
uint64 | echoedBytesSent |
uint64 | bytesRcvd |
uint64 | numRequestsToSend |
uint64 | numPacketsToReceive |
uint32 | numBytes |
int64 | bufferSize |
int32 | echoFactor |
int32 | queueSize |
uint32 | inStreams |
uint32 | outStreams |
bool | ordered |
bool | sendAllowed |
bool | timer |
bool | finishEndsSimulation |
cMessage * | timeMsg |
cMessage * | stopTimer |
cMessage * | primaryChangeTimer |
typedef std::map<IPvXAddress,pathStatus> SCTPClient::SCTPPathStatus |
void SCTPClient::addressAddedArrived | ( | int32 | assocId, |
IPvXAddress | remoteAddr | ||
) |
{ }
void SCTPClient::close | ( | ) |
Issues CLOSE command
Referenced by handleTimer(), socketDataArrived(), and socketPeerClosed().
{ setStatusString("closing"); socket.close(); }
void SCTPClient::connect | ( | ) |
Issues an active OPEN to the address/port given as module parameters
Referenced by handleTimer().
{ const char *connectAddress = par("connectAddress"); int32 connectPort = par("connectPort"); inStreams = par("inboundStreams"); outStreams = par("outboundStreams"); socket.setInboundStreams(inStreams); socket.setOutboundStreams(outStreams); ev << "issuing OPEN command\n"; setStatusString("connecting"); ev<<"connect to address "<<connectAddress<<"\n"; socket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession")); numSessions++; }
void SCTPClient::finish | ( | ) |
Records basic statistics: numSessions, packetsSent, packetsRcvd, bytesSent, bytesRcvd. Redefine to record different or more statistics at the end of the simulation.
{ if (timeMsg->isScheduled()) cancelEvent(timeMsg); delete timeMsg; if (stopTimer) { cancelEvent(stopTimer); delete stopTimer; } if (primaryChangeTimer) { cancelEvent(primaryChangeTimer); delete primaryChangeTimer; primaryChangeTimer = NULL; } ev << getFullPath() << ": opened " << numSessions << " sessions\n"; ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n"; ev << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n"; sctpEV3<<"Client finished\n"; }
void SCTPClient::handleMessage | ( | cMessage * | msg | ) |
For self-messages it invokes handleTimer(); messages arriving from SCTP will get dispatched to the socketXXX() functions.
{ if (msg->isSelfMessage()) handleTimer(msg); else { socket.processMessage(PK(msg)); } }
void SCTPClient::handleTimer | ( | cMessage * | msg | ) |
Invoked from handleMessage(). Should be redefined to handle self-messages.
Referenced by handleMessage().
{ switch (msg->getKind()) { case MSGKIND_CONNECT: ev << "starting session call connect\n"; connect(); break; case MSGKIND_SEND: if (((!timer && numRequestsToSend>0) || timer)) { if (sendAllowed) { sendRequest(); if (!timer) numRequestsToSend--; } if ((simtime_t)par("thinkTime") > 0) scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg); if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) { socket.shutdown(); if (timeMsg->isScheduled()) cancelEvent(timeMsg); if (finishEndsSimulation) { endSimulation(); } } } else if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) { socket.shutdown(); if (timeMsg->isScheduled()) cancelEvent(timeMsg); if (finishEndsSimulation) { endSimulation(); } } break; case MSGKIND_ABORT: close(); break; case MSGKIND_PRIMARY: setPrimaryPath((const char*)par("newPrimary")); break; case MSGKIND_STOP: numRequestsToSend=0; sendAllowed = false; socket.abort(); socket.close(); if (timeMsg->isScheduled()) cancelEvent(timeMsg); socket.close(); if (finishEndsSimulation) { endSimulation(); } break; default: ev<<"MsgKind ="<<msg->getKind()<<" unknown\n"; break; } }
void SCTPClient::initialize | ( | ) |
Initialization.
{ const char * address; char* token; AddressVector addresses; sctpEV3<<"initialize SCTP Client\n"; numSessions = numBroken = packetsSent = packetsRcvd = bytesSent = echoedBytesSent = bytesRcvd = 0; WATCH(numSessions); WATCH(numBroken); WATCH(packetsSent); WATCH(packetsRcvd); WATCH(bytesSent); WATCH(bytesRcvd); // parameters address=par("address"); token = strtok((char*)address,","); while (token != NULL) { addresses.push_back(IPvXAddress(token)); token = strtok(NULL, ","); } int32 port = par("port"); echoFactor = par("echoFactor"); if (!echoFactor) echoFactor = false; ordered = (bool)par("ordered"); finishEndsSimulation = (bool)par("finishEndsSimulation"); if (strcmp(address,"")==0) { socket.bind(port); } else { socket.bindx(addresses, port); } socket.setCallbackObject(this); socket.setOutputGate(gate("sctpOut")); setStatusString("waiting"); timeMsg = new cMessage("CliAppTimer"); numRequestsToSend = 0; numPacketsToReceive = 0; queueSize = par("queueSize"); WATCH(numRequestsToSend); recordScalar("ums", (uint32) par("requestLength")); timeMsg->setKind(MSGKIND_CONNECT); scheduleAt((simtime_t)par("startTime"), timeMsg); sendAllowed = true; bufferSize = 0; if ((simtime_t)par("stopTime")!=0) { stopTimer = new cMessage("StopTimer"); stopTimer->setKind(MSGKIND_STOP); scheduleAt((simtime_t)par("stopTime"), stopTimer); timer = true; } else { timer = false; stopTimer = NULL; } if ((simtime_t)par("primaryTime")!=0) { primaryChangeTimer = new cMessage("PrimaryTime"); primaryChangeTimer->setKind(MSGKIND_PRIMARY); scheduleAt((simtime_t)par("primaryTime"), primaryChangeTimer); } else { primaryChangeTimer = NULL; } }
void SCTPClient::sendqueueAbatedArrived | ( | int32 | connId, |
uint64 | buffer | ||
) |
{ bufferSize = buffer; sendAllowed = true; while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) || (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0)) { if (!timer && numRequestsToSend==1) sendRequest(true); else sendRequest(false); if (!timer && (--numRequestsToSend == 0)) sendAllowed = false; } if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) { sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n"; socket.shutdown(); if (timeMsg->isScheduled()) cancelEvent(timeMsg); if (finishEndsSimulation) { endSimulation(); } } }
void SCTPClient::sendqueueFullArrived | ( | int32 | connId | ) |
{ sendAllowed = false; }
void SCTPClient::sendQueueRequest | ( | ) |
Referenced by socketEstablished().
{ cPacket* cmsg = new cPacket("Queue"); SCTPInfo* qinfo = new SCTPInfo(); qinfo->setText(queueSize); cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT); qinfo->setAssocId(socket.getConnectionId()); cmsg->setControlInfo(qinfo); sctpEV3 << "Sending queue request ..." << endl; socket.sendRequest(cmsg); }
void SCTPClient::sendRequest | ( | bool | last = true | ) | [protected] |
Utility: sends a request to the server
Referenced by handleTimer(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().
{ uint32 i, sendBytes; sendBytes = par("requestLength"); if (sendBytes < 1) sendBytes=1; cPacket* cmsg = new cPacket("AppData"); SCTPSimpleMessage* msg=new SCTPSimpleMessage("data"); msg->setDataArraySize(sendBytes); for (i=0; i<sendBytes; i++) { msg->setData(i, 'a'); } msg->setDataLen(sendBytes); msg->setByteLength(sendBytes); msg->setCreationTime(simulation.getSimTime()); cmsg->encapsulate(msg); if (ordered) cmsg->setKind(SCTP_C_SEND_ORDERED); else cmsg->setKind(SCTP_C_SEND_UNORDERED); // send SCTPMessage with SCTPSimpleMessage enclosed sctpEV3 << "Sending request ..." << endl; bufferSize -= sendBytes; if (bufferSize < 0) last = true; socket.send(cmsg, last); bytesSent+=sendBytes; }
void SCTPClient::sendRequestArrived | ( | ) | [virtual] |
Reimplemented from SCTPSocket::CallbackInterface.
{ int32 count = 0; sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n"; while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize && sendAllowed) { if (count == queueSize) sendRequest(); else sendRequest(false); if (!timer) numRequestsToSend--; if ((!timer && numRequestsToSend == 0)) { sctpEV3<<"no more packets to send, call shutdown\n"; socket.shutdown(); if (timeMsg->isScheduled()) cancelEvent(timeMsg); if (finishEndsSimulation) { endSimulation(); } } } }
void SCTPClient::setAssociation | ( | SCTPAssociation * | _assoc | ) | [inline] |
{assoc = _assoc;};
void SCTPClient::setPrimaryPath | ( | const char * | addr | ) |
Referenced by handleTimer().
{ cPacket* cmsg = new cPacket("CMSG-SetPrimary"); SCTPPathInfo *pinfo = new SCTPPathInfo(); if (strcmp(str,"")!=0) { pinfo->setRemoteAddress(IPvXAddress(str)); } else { str = (const char*)par("newPrimary"); if (strcmp(str, "")!=0) pinfo->setRemoteAddress(IPvXAddress(str)); else { str = (const char*)par("connectAddress"); pinfo->setRemoteAddress(IPvXAddress(str)); } } pinfo->setAssocId(socket.getConnectionId()); cmsg->setKind(SCTP_C_PRIMARY); cmsg->setControlInfo(pinfo); socket.sendNotification(cmsg); }
void SCTPClient::setStatusString | ( | const char * | s | ) |
Sends a GenericAppMsg of the given length When running under GUI, it displays the given string next to the icon
Referenced by close(), connect(), initialize(), socketClosed(), socketEstablished(), and socketFailure().
{ if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s); }
void SCTPClient::shutdownReceivedArrived | ( | int32 | connId | ) |
{ if (numRequestsToSend==0) { cPacket* cmsg = new cPacket("Request"); SCTPInfo* qinfo = new SCTPInfo(); cmsg->setKind(SCTP_C_NO_OUTSTANDING); qinfo->setAssocId(connId); cmsg->setControlInfo(qinfo); socket.sendNotification(cmsg); } }
void SCTPClient::socketClosed | ( | int32 | connId, |
void * | yourPtr | ||
) |
Does nothing but update statistics/status. Redefine if you want to do something else, such as opening a new connection.
{ // *redefine* to start another session etc. ev << "connection closed\n"; setStatusString("closed"); if (primaryChangeTimer) { cancelEvent(primaryChangeTimer); delete primaryChangeTimer; primaryChangeTimer = NULL; } }
void SCTPClient::socketDataArrived | ( | int32 | connId, |
void * | yourPtr, | ||
cPacket * | msg, | ||
bool | urgent | ||
) |
Does nothing but update statistics/status. Redefine to perform or schedule next sending. Beware: this funcion deletes the incoming message, which might not be what you want.
{ packetsRcvd++; sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n"; SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->removeControlInfo()); bytesRcvd+=msg->getByteLength(); if (echoFactor > 0) { SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup()); cPacket* cmsg = new cPacket("SVData"); echoedBytesSent+=smsg->getBitLength()/8; cmsg->encapsulate(smsg); if (ind->getSendUnordered()) cmsg->setKind(SCTP_C_SEND_UNORDERED); else cmsg->setKind(SCTP_C_SEND_ORDERED); packetsSent++; delete msg; socket.send(cmsg, 1); } if ((long)par("numPacketsToReceive")>0) { numPacketsToReceive--; if (numPacketsToReceive == 0) { close(); } } delete ind; }
void SCTPClient::socketDataNotificationArrived | ( | int32 | connId, |
void * | yourPtr, | ||
cPacket * | msg | ||
) |
{ SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo()); cPacket* cmsg = new cPacket("CMSG-DataArr"); SCTPSendCommand *cmd = new SCTPSendCommand(); cmd->setAssocId(ind->getAssocId()); cmd->setSid(ind->getSid()); cmd->setNumMsgs(ind->getNumMsgs()); cmsg->setKind(SCTP_C_RECEIVE); cmsg->setControlInfo(cmd); delete ind; socket.sendNotification(cmsg); }
void SCTPClient::socketEstablished | ( | int32 | connId, |
void * | yourPtr, | ||
uint64 | buffer | ||
) |
Does nothing but update statistics/status. Redefine to perform or schedule first sending.
{ int32 count = 0; ev<<"SCTPClient: connected\n"; setStatusString("connected"); bufferSize = buffer; // determine number of requests in this session numRequestsToSend = (long) par("numRequestsPerSession"); numPacketsToReceive = (long) par("numPacketsToReceive"); if (numRequestsToSend<1) numRequestsToSend = 0; sctpEV3<<"SCTPClient:numRequestsToSend="<<numRequestsToSend<<"\n"; // perform first request (next one will be sent when reply arrives) if ((numRequestsToSend>0 && !timer) || timer) { if ((simtime_t)par("thinkTime") > 0) { if (sendAllowed) { sendRequest(); if (!timer) numRequestsToSend--; } timeMsg->setKind(MSGKIND_SEND); scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg); } else { if (queueSize>0) { while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize*2 && sendAllowed) { if (count == queueSize*2) sendRequest(); else sendRequest(false); if (!timer) { if (--numRequestsToSend == 0) sendAllowed = false; } } if (((!timer && numRequestsToSend>0) || timer) && sendAllowed) sendQueueRequest(); } else { while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) || (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0)) { if (!timer && numRequestsToSend==1) sendRequest(true); else sendRequest(false); if (!timer && (--numRequestsToSend == 0)) sendAllowed = false; } } } if ((!timer && numPacketsToReceive == 0) && (simtime_t)par("waitToClose")>0) { timeMsg->setKind(MSGKIND_ABORT); scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg); } if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0) { sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n"; socket.shutdown(); if (timeMsg->isScheduled()) cancelEvent(timeMsg); if (finishEndsSimulation) { endSimulation(); } } } }
void SCTPClient::socketFailure | ( | int32 | connId, |
void * | yourPtr, | ||
int32 | code | ||
) |
Does nothing but update statistics/status. Redefine if you want to try reconnecting after a delay.
{ // subclasses may override this function, and add code try to reconnect after a delay. ev << "connection broken\n"; setStatusString("broken"); numBroken++; // reconnect after a delay timeMsg->setKind(MSGKIND_CONNECT); scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg); }
void SCTPClient::socketPeerClosed | ( | int32 | connId, |
void * | yourPtr | ||
) |
void SCTPClient::socketStatusArrived | ( | int32 | connId, |
void * | yourPtr, | ||
SCTPStatusInfo * | status | ||
) |
Redefine to handle incoming SCTPStatusInfo.
{ struct pathStatus ps; SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId()); if (i!=sctpPathStatus.end()) { ps = i->second; ps.active=status->getActive(); } else { ps.active = status->getActive(); ps.pid = status->getPathId(); ps.primaryPath = false; sctpPathStatus[ps.pid]=ps; } }
SCTPAssociation* SCTPClient::assoc [protected] |
int64 SCTPClient::bufferSize [protected] |
Referenced by initialize(), sendqueueAbatedArrived(), sendRequest(), and socketEstablished().
uint64 SCTPClient::bytesRcvd [protected] |
Referenced by finish(), initialize(), and socketDataArrived().
uint64 SCTPClient::bytesSent [protected] |
Referenced by finish(), initialize(), and sendRequest().
uint64 SCTPClient::echoedBytesSent [protected] |
Referenced by initialize(), and socketDataArrived().
int32 SCTPClient::echoFactor [protected] |
Referenced by initialize(), and socketDataArrived().
bool SCTPClient::finishEndsSimulation [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().
uint32 SCTPClient::inStreams [protected] |
Referenced by connect().
int32 SCTPClient::numBroken [protected] |
Referenced by initialize(), and socketFailure().
uint32 SCTPClient::numBytes [protected] |
uint64 SCTPClient::numPacketsToReceive [protected] |
Referenced by initialize(), socketDataArrived(), and socketEstablished().
uint64 SCTPClient::numRequestsToSend [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), shutdownReceivedArrived(), and socketEstablished().
int32 SCTPClient::numSessions [protected] |
Referenced by connect(), finish(), and initialize().
bool SCTPClient::ordered [protected] |
Referenced by initialize(), and sendRequest().
uint32 SCTPClient::outStreams [protected] |
Referenced by connect().
uint64 SCTPClient::packetsRcvd [protected] |
Referenced by finish(), initialize(), and socketDataArrived().
uint64 SCTPClient::packetsSent [protected] |
Referenced by finish(), initialize(), and socketDataArrived().
cMessage* SCTPClient::primaryChangeTimer [protected] |
Referenced by finish(), initialize(), and socketClosed().
int32 SCTPClient::queueSize [protected] |
Referenced by initialize(), sendQueueRequest(), sendRequestArrived(), and socketEstablished().
Referenced by socketStatusArrived().
bool SCTPClient::sendAllowed [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendqueueFullArrived(), sendRequestArrived(), and socketEstablished().
SCTPSocket SCTPClient::socket [protected] |
Referenced by close(), connect(), handleMessage(), handleTimer(), initialize(), sendqueueAbatedArrived(), sendQueueRequest(), sendRequest(), sendRequestArrived(), setPrimaryPath(), shutdownReceivedArrived(), socketDataArrived(), socketDataNotificationArrived(), socketEstablished(), and socketPeerClosed().
cMessage* SCTPClient::stopTimer [protected] |
Referenced by finish(), and initialize().
cMessage* SCTPClient::timeMsg [protected] |
Referenced by finish(), handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), socketEstablished(), and socketFailure().
bool SCTPClient::timer [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().