|
INET Framework for OMNeT++/OMNEST
|
#include <SCTPClient.h>
Classes | |
| struct | pathStatus |
Public Types | |
| typedef std::map< IPvXAddress, pathStatus > | SCTPPathStatus |
Public Member Functions | |
| void | initialize () |
| void | handleMessage (cMessage *msg) |
| void | finish () |
| void | handleTimer (cMessage *msg) |
| void | setAssociation (SCTPAssociation *_assoc) |
| void | setPrimaryPath (const char *addr) |
| void | sendRequestArrived () |
| void | sendQueueRequest () |
| void | shutdownReceivedArrived (int32 connId) |
| void | sendqueueFullArrived (int32 connId) |
| void | sendqueueAbatedArrived (int32 connId, uint64 buffer) |
| void | addressAddedArrived (int32 assocId, IPvXAddress remoteAddr) |
Utility functions | |
| void | connect () |
| void | close () |
| void | setStatusString (const char *s) |
SCTPSocket::CallbackInterface callback methods | |
| void | socketEstablished (int32 connId, void *yourPtr, uint64 buffer) |
| void | socketDataArrived (int32 connId, void *yourPtr, cPacket *msg, bool urgent) |
| void | socketDataNotificationArrived (int32 connId, void *yourPtr, cPacket *msg) |
| void | socketPeerClosed (int32 connId, void *yourPtr) |
| void | socketClosed (int32 connId, void *yourPtr) |
| void | socketFailure (int32 connId, void *yourPtr, int32 code) |
| void | socketStatusArrived (int32 connId, void *yourPtr, SCTPStatusInfo *status) |
Public Attributes | |
| SCTPPathStatus | sctpPathStatus |
Protected Member Functions | |
| void | sendRequest (bool last=true) |
Protected Attributes | |
| SCTPSocket | socket |
| SCTPAssociation * | assoc |
| int32 | numSessions |
| int32 | numBroken |
| uint64 | packetsSent |
| uint64 | packetsRcvd |
| uint64 | bytesSent |
| uint64 | echoedBytesSent |
| uint64 | bytesRcvd |
| uint64 | numRequestsToSend |
| uint64 | numPacketsToReceive |
| uint32 | numBytes |
| int64 | bufferSize |
| int32 | echoFactor |
| int32 | queueSize |
| uint32 | inStreams |
| uint32 | outStreams |
| bool | ordered |
| bool | sendAllowed |
| bool | timer |
| bool | finishEndsSimulation |
| cMessage * | timeMsg |
| cMessage * | stopTimer |
| cMessage * | primaryChangeTimer |
| typedef std::map<IPvXAddress,pathStatus> SCTPClient::SCTPPathStatus |
| void SCTPClient::addressAddedArrived | ( | int32 | assocId, |
| IPvXAddress | remoteAddr | ||
| ) |
{
}
| void SCTPClient::close | ( | ) |
Issues CLOSE command
Referenced by handleTimer(), socketDataArrived(), and socketPeerClosed().
{
setStatusString("closing");
socket.close();
}
| void SCTPClient::connect | ( | ) |
Issues an active OPEN to the address/port given as module parameters
Referenced by handleTimer().
{
const char *connectAddress = par("connectAddress");
int32 connectPort = par("connectPort");
inStreams = par("inboundStreams");
outStreams = par("outboundStreams");
socket.setInboundStreams(inStreams);
socket.setOutboundStreams(outStreams);
ev << "issuing OPEN command\n";
setStatusString("connecting");
ev<<"connect to address "<<connectAddress<<"\n";
socket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession"));
numSessions++;
}
| void SCTPClient::finish | ( | ) |
Records basic statistics: numSessions, packetsSent, packetsRcvd, bytesSent, bytesRcvd. Redefine to record different or more statistics at the end of the simulation.
{
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
delete timeMsg;
if (stopTimer)
{
cancelEvent(stopTimer);
delete stopTimer;
}
if (primaryChangeTimer)
{
cancelEvent(primaryChangeTimer);
delete primaryChangeTimer;
primaryChangeTimer = NULL;
}
ev << getFullPath() << ": opened " << numSessions << " sessions\n";
ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
ev << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n";
sctpEV3<<"Client finished\n";
}
| void SCTPClient::handleMessage | ( | cMessage * | msg | ) |
For self-messages it invokes handleTimer(); messages arriving from SCTP will get dispatched to the socketXXX() functions.
{
if (msg->isSelfMessage())
handleTimer(msg);
else
{
socket.processMessage(PK(msg));
}
}
| void SCTPClient::handleTimer | ( | cMessage * | msg | ) |
Invoked from handleMessage(). Should be redefined to handle self-messages.
Referenced by handleMessage().
{
switch (msg->getKind())
{
case MSGKIND_CONNECT:
ev << "starting session call connect\n";
connect();
break;
case MSGKIND_SEND:
if (((!timer && numRequestsToSend>0) || timer))
{
if (sendAllowed)
{
sendRequest();
if (!timer)
numRequestsToSend--;
}
if ((simtime_t)par("thinkTime") > 0)
scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
{
socket.shutdown();
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
if (finishEndsSimulation) {
endSimulation();
}
}
}
else if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
{
socket.shutdown();
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
if (finishEndsSimulation) {
endSimulation();
}
}
break;
case MSGKIND_ABORT:
close();
break;
case MSGKIND_PRIMARY:
setPrimaryPath((const char*)par("newPrimary"));
break;
case MSGKIND_STOP:
numRequestsToSend=0;
sendAllowed = false;
socket.abort();
socket.close();
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
socket.close();
if (finishEndsSimulation) {
endSimulation();
}
break;
default:
ev<<"MsgKind ="<<msg->getKind()<<" unknown\n";
break;
}
}
| void SCTPClient::initialize | ( | ) |
Initialization.
{
const char * address;
char* token;
AddressVector addresses;
sctpEV3<<"initialize SCTP Client\n";
numSessions = numBroken = packetsSent = packetsRcvd = bytesSent = echoedBytesSent = bytesRcvd = 0;
WATCH(numSessions);
WATCH(numBroken);
WATCH(packetsSent);
WATCH(packetsRcvd);
WATCH(bytesSent);
WATCH(bytesRcvd);
// parameters
address=par("address");
token = strtok((char*)address,",");
while (token != NULL)
{
addresses.push_back(IPvXAddress(token));
token = strtok(NULL, ",");
}
int32 port = par("port");
echoFactor = par("echoFactor");
if (!echoFactor) echoFactor = false;
ordered = (bool)par("ordered");
finishEndsSimulation = (bool)par("finishEndsSimulation");
if (strcmp(address,"")==0)
{
socket.bind(port);
}
else
{
socket.bindx(addresses, port);
}
socket.setCallbackObject(this);
socket.setOutputGate(gate("sctpOut"));
setStatusString("waiting");
timeMsg = new cMessage("CliAppTimer");
numRequestsToSend = 0;
numPacketsToReceive = 0;
queueSize = par("queueSize");
WATCH(numRequestsToSend);
recordScalar("ums", (uint32) par("requestLength"));
timeMsg->setKind(MSGKIND_CONNECT);
scheduleAt((simtime_t)par("startTime"), timeMsg);
sendAllowed = true;
bufferSize = 0;
if ((simtime_t)par("stopTime")!=0)
{
stopTimer = new cMessage("StopTimer");
stopTimer->setKind(MSGKIND_STOP);
scheduleAt((simtime_t)par("stopTime"), stopTimer);
timer = true;
}
else
{
timer = false;
stopTimer = NULL;
}
if ((simtime_t)par("primaryTime")!=0)
{
primaryChangeTimer = new cMessage("PrimaryTime");
primaryChangeTimer->setKind(MSGKIND_PRIMARY);
scheduleAt((simtime_t)par("primaryTime"), primaryChangeTimer);
}
else
{
primaryChangeTimer = NULL;
}
}
| void SCTPClient::sendqueueAbatedArrived | ( | int32 | connId, |
| uint64 | buffer | ||
| ) |
{
bufferSize = buffer;
sendAllowed = true;
while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
(((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
{
if (!timer && numRequestsToSend==1)
sendRequest(true);
else
sendRequest(false);
if (!timer && (--numRequestsToSend == 0))
sendAllowed = false;
}
if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
{
sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
socket.shutdown();
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
if (finishEndsSimulation) {
endSimulation();
}
}
}
| void SCTPClient::sendqueueFullArrived | ( | int32 | connId | ) |
{
sendAllowed = false;
}
| void SCTPClient::sendQueueRequest | ( | ) |
Referenced by socketEstablished().
{
cPacket* cmsg = new cPacket("Queue");
SCTPInfo* qinfo = new SCTPInfo();
qinfo->setText(queueSize);
cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
qinfo->setAssocId(socket.getConnectionId());
cmsg->setControlInfo(qinfo);
sctpEV3 << "Sending queue request ..." << endl;
socket.sendRequest(cmsg);
}
| void SCTPClient::sendRequest | ( | bool | last = true | ) | [protected] |
Utility: sends a request to the server
Referenced by handleTimer(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().
{
uint32 i, sendBytes;
sendBytes = par("requestLength");
if (sendBytes < 1)
sendBytes=1;
cPacket* cmsg = new cPacket("AppData");
SCTPSimpleMessage* msg=new SCTPSimpleMessage("data");
msg->setDataArraySize(sendBytes);
for (i=0; i<sendBytes; i++)
{
msg->setData(i, 'a');
}
msg->setDataLen(sendBytes);
msg->setByteLength(sendBytes);
msg->setCreationTime(simulation.getSimTime());
cmsg->encapsulate(msg);
if (ordered)
cmsg->setKind(SCTP_C_SEND_ORDERED);
else
cmsg->setKind(SCTP_C_SEND_UNORDERED);
// send SCTPMessage with SCTPSimpleMessage enclosed
sctpEV3 << "Sending request ..." << endl;
bufferSize -= sendBytes;
if (bufferSize < 0)
last = true;
socket.send(cmsg, last);
bytesSent+=sendBytes;
}
| void SCTPClient::sendRequestArrived | ( | ) | [virtual] |
Reimplemented from SCTPSocket::CallbackInterface.
{
int32 count = 0;
sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n";
while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize && sendAllowed)
{
if (count == queueSize)
sendRequest();
else
sendRequest(false);
if (!timer)
numRequestsToSend--;
if ((!timer && numRequestsToSend == 0))
{
sctpEV3<<"no more packets to send, call shutdown\n";
socket.shutdown();
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
if (finishEndsSimulation) {
endSimulation();
}
}
}
}
| void SCTPClient::setAssociation | ( | SCTPAssociation * | _assoc | ) | [inline] |
{assoc = _assoc;};
| void SCTPClient::setPrimaryPath | ( | const char * | addr | ) |
Referenced by handleTimer().
{
cPacket* cmsg = new cPacket("CMSG-SetPrimary");
SCTPPathInfo *pinfo = new SCTPPathInfo();
if (strcmp(str,"")!=0)
{
pinfo->setRemoteAddress(IPvXAddress(str));
}
else
{
str = (const char*)par("newPrimary");
if (strcmp(str, "")!=0)
pinfo->setRemoteAddress(IPvXAddress(str));
else
{
str = (const char*)par("connectAddress");
pinfo->setRemoteAddress(IPvXAddress(str));
}
}
pinfo->setAssocId(socket.getConnectionId());
cmsg->setKind(SCTP_C_PRIMARY);
cmsg->setControlInfo(pinfo);
socket.sendNotification(cmsg);
}
| void SCTPClient::setStatusString | ( | const char * | s | ) |
Sends a GenericAppMsg of the given length When running under GUI, it displays the given string next to the icon
Referenced by close(), connect(), initialize(), socketClosed(), socketEstablished(), and socketFailure().
{
if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s);
}
| void SCTPClient::shutdownReceivedArrived | ( | int32 | connId | ) |
{
if (numRequestsToSend==0)
{
cPacket* cmsg = new cPacket("Request");
SCTPInfo* qinfo = new SCTPInfo();
cmsg->setKind(SCTP_C_NO_OUTSTANDING);
qinfo->setAssocId(connId);
cmsg->setControlInfo(qinfo);
socket.sendNotification(cmsg);
}
}
| void SCTPClient::socketClosed | ( | int32 | connId, |
| void * | yourPtr | ||
| ) |
Does nothing but update statistics/status. Redefine if you want to do something else, such as opening a new connection.
{
// *redefine* to start another session etc.
ev << "connection closed\n";
setStatusString("closed");
if (primaryChangeTimer)
{
cancelEvent(primaryChangeTimer);
delete primaryChangeTimer;
primaryChangeTimer = NULL;
}
}
| void SCTPClient::socketDataArrived | ( | int32 | connId, |
| void * | yourPtr, | ||
| cPacket * | msg, | ||
| bool | urgent | ||
| ) |
Does nothing but update statistics/status. Redefine to perform or schedule next sending. Beware: this funcion deletes the incoming message, which might not be what you want.
{
packetsRcvd++;
sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n";
SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->removeControlInfo());
bytesRcvd+=msg->getByteLength();
if (echoFactor > 0)
{
SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
cPacket* cmsg = new cPacket("SVData");
echoedBytesSent+=smsg->getBitLength()/8;
cmsg->encapsulate(smsg);
if (ind->getSendUnordered())
cmsg->setKind(SCTP_C_SEND_UNORDERED);
else
cmsg->setKind(SCTP_C_SEND_ORDERED);
packetsSent++;
delete msg;
socket.send(cmsg, 1);
}
if ((long)par("numPacketsToReceive")>0)
{
numPacketsToReceive--;
if (numPacketsToReceive == 0)
{
close();
}
}
delete ind;
}
| void SCTPClient::socketDataNotificationArrived | ( | int32 | connId, |
| void * | yourPtr, | ||
| cPacket * | msg | ||
| ) |
{
SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
cPacket* cmsg = new cPacket("CMSG-DataArr");
SCTPSendCommand *cmd = new SCTPSendCommand();
cmd->setAssocId(ind->getAssocId());
cmd->setSid(ind->getSid());
cmd->setNumMsgs(ind->getNumMsgs());
cmsg->setKind(SCTP_C_RECEIVE);
cmsg->setControlInfo(cmd);
delete ind;
socket.sendNotification(cmsg);
}
| void SCTPClient::socketEstablished | ( | int32 | connId, |
| void * | yourPtr, | ||
| uint64 | buffer | ||
| ) |
Does nothing but update statistics/status. Redefine to perform or schedule first sending.
{
int32 count = 0;
ev<<"SCTPClient: connected\n";
setStatusString("connected");
bufferSize = buffer;
// determine number of requests in this session
numRequestsToSend = (long) par("numRequestsPerSession");
numPacketsToReceive = (long) par("numPacketsToReceive");
if (numRequestsToSend<1)
numRequestsToSend = 0;
sctpEV3<<"SCTPClient:numRequestsToSend="<<numRequestsToSend<<"\n";
// perform first request (next one will be sent when reply arrives)
if ((numRequestsToSend>0 && !timer) || timer)
{
if ((simtime_t)par("thinkTime") > 0)
{
if (sendAllowed)
{
sendRequest();
if (!timer)
numRequestsToSend--;
}
timeMsg->setKind(MSGKIND_SEND);
scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
}
else
{
if (queueSize>0)
{
while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize*2 && sendAllowed)
{
if (count == queueSize*2)
sendRequest();
else
sendRequest(false);
if (!timer)
{
if (--numRequestsToSend == 0)
sendAllowed = false;
}
}
if (((!timer && numRequestsToSend>0) || timer) && sendAllowed)
sendQueueRequest();
}
else
{
while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
(((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
{
if (!timer && numRequestsToSend==1)
sendRequest(true);
else
sendRequest(false);
if (!timer && (--numRequestsToSend == 0))
sendAllowed = false;
}
}
}
if ((!timer && numPacketsToReceive == 0) && (simtime_t)par("waitToClose")>0)
{
timeMsg->setKind(MSGKIND_ABORT);
scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg);
}
if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
{
sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
socket.shutdown();
if (timeMsg->isScheduled())
cancelEvent(timeMsg);
if (finishEndsSimulation) {
endSimulation();
}
}
}
}
| void SCTPClient::socketFailure | ( | int32 | connId, |
| void * | yourPtr, | ||
| int32 | code | ||
| ) |
Does nothing but update statistics/status. Redefine if you want to try reconnecting after a delay.
{
// subclasses may override this function, and add code try to reconnect after a delay.
ev << "connection broken\n";
setStatusString("broken");
numBroken++;
// reconnect after a delay
timeMsg->setKind(MSGKIND_CONNECT);
scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg);
}
| void SCTPClient::socketPeerClosed | ( | int32 | connId, |
| void * | yourPtr | ||
| ) |
| void SCTPClient::socketStatusArrived | ( | int32 | connId, |
| void * | yourPtr, | ||
| SCTPStatusInfo * | status | ||
| ) |
Redefine to handle incoming SCTPStatusInfo.
{
struct pathStatus ps;
SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId());
if (i!=sctpPathStatus.end())
{
ps = i->second;
ps.active=status->getActive();
}
else
{
ps.active = status->getActive();
ps.pid = status->getPathId();
ps.primaryPath = false;
sctpPathStatus[ps.pid]=ps;
}
}
SCTPAssociation* SCTPClient::assoc [protected] |
int64 SCTPClient::bufferSize [protected] |
Referenced by initialize(), sendqueueAbatedArrived(), sendRequest(), and socketEstablished().
uint64 SCTPClient::bytesRcvd [protected] |
Referenced by finish(), initialize(), and socketDataArrived().
uint64 SCTPClient::bytesSent [protected] |
Referenced by finish(), initialize(), and sendRequest().
uint64 SCTPClient::echoedBytesSent [protected] |
Referenced by initialize(), and socketDataArrived().
int32 SCTPClient::echoFactor [protected] |
Referenced by initialize(), and socketDataArrived().
bool SCTPClient::finishEndsSimulation [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().
uint32 SCTPClient::inStreams [protected] |
Referenced by connect().
int32 SCTPClient::numBroken [protected] |
Referenced by initialize(), and socketFailure().
uint32 SCTPClient::numBytes [protected] |
uint64 SCTPClient::numPacketsToReceive [protected] |
Referenced by initialize(), socketDataArrived(), and socketEstablished().
uint64 SCTPClient::numRequestsToSend [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), shutdownReceivedArrived(), and socketEstablished().
int32 SCTPClient::numSessions [protected] |
Referenced by connect(), finish(), and initialize().
bool SCTPClient::ordered [protected] |
Referenced by initialize(), and sendRequest().
uint32 SCTPClient::outStreams [protected] |
Referenced by connect().
uint64 SCTPClient::packetsRcvd [protected] |
Referenced by finish(), initialize(), and socketDataArrived().
uint64 SCTPClient::packetsSent [protected] |
Referenced by finish(), initialize(), and socketDataArrived().
cMessage* SCTPClient::primaryChangeTimer [protected] |
Referenced by finish(), initialize(), and socketClosed().
int32 SCTPClient::queueSize [protected] |
Referenced by initialize(), sendQueueRequest(), sendRequestArrived(), and socketEstablished().
Referenced by socketStatusArrived().
bool SCTPClient::sendAllowed [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendqueueFullArrived(), sendRequestArrived(), and socketEstablished().
SCTPSocket SCTPClient::socket [protected] |
Referenced by close(), connect(), handleMessage(), handleTimer(), initialize(), sendqueueAbatedArrived(), sendQueueRequest(), sendRequest(), sendRequestArrived(), setPrimaryPath(), shutdownReceivedArrived(), socketDataArrived(), socketDataNotificationArrived(), socketEstablished(), and socketPeerClosed().
cMessage* SCTPClient::stopTimer [protected] |
Referenced by finish(), and initialize().
cMessage* SCTPClient::timeMsg [protected] |
Referenced by finish(), handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), socketEstablished(), and socketFailure().
bool SCTPClient::timer [protected] |
Referenced by handleTimer(), initialize(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().