INET Framework for OMNeT++/OMNEST
SCTPClient Class Reference

#include <SCTPClient.h>

Inheritance diagram for SCTPClient:
SCTPSocket::CallbackInterface

List of all members.

Classes

struct  pathStatus

Public Types

typedef std::map< IPvXAddress,
pathStatus
SCTPPathStatus

Public Member Functions

void initialize ()
void handleMessage (cMessage *msg)
void finish ()
void handleTimer (cMessage *msg)
void setAssociation (SCTPAssociation *_assoc)
void setPrimaryPath (const char *addr)
void sendRequestArrived ()
void sendQueueRequest ()
void shutdownReceivedArrived (int32 connId)
void sendqueueFullArrived (int32 connId)
void sendqueueAbatedArrived (int32 connId, uint64 buffer)
void addressAddedArrived (int32 assocId, IPvXAddress remoteAddr)
Utility functions
void connect ()
void close ()
void setStatusString (const char *s)
SCTPSocket::CallbackInterface callback methods
void socketEstablished (int32 connId, void *yourPtr, uint64 buffer)
void socketDataArrived (int32 connId, void *yourPtr, cPacket *msg, bool urgent)
void socketDataNotificationArrived (int32 connId, void *yourPtr, cPacket *msg)
void socketPeerClosed (int32 connId, void *yourPtr)
void socketClosed (int32 connId, void *yourPtr)
void socketFailure (int32 connId, void *yourPtr, int32 code)
void socketStatusArrived (int32 connId, void *yourPtr, SCTPStatusInfo *status)

Public Attributes

SCTPPathStatus sctpPathStatus

Protected Member Functions

void sendRequest (bool last=true)

Protected Attributes

SCTPSocket socket
SCTPAssociationassoc
int32 numSessions
int32 numBroken
uint64 packetsSent
uint64 packetsRcvd
uint64 bytesSent
uint64 echoedBytesSent
uint64 bytesRcvd
uint64 numRequestsToSend
uint64 numPacketsToReceive
uint32 numBytes
int64 bufferSize
int32 echoFactor
int32 queueSize
uint32 inStreams
uint32 outStreams
bool ordered
bool sendAllowed
bool timer
bool finishEndsSimulation
cMessage * timeMsg
cMessage * stopTimer
cMessage * primaryChangeTimer

Member Typedef Documentation


Member Function Documentation

void SCTPClient::addressAddedArrived ( int32  assocId,
IPvXAddress  remoteAddr 
)
{
}
void SCTPClient::close ( )

Issues CLOSE command

Referenced by handleTimer(), socketDataArrived(), and socketPeerClosed().

{
    setStatusString("closing");
    socket.close();
}
void SCTPClient::connect ( )

Issues an active OPEN to the address/port given as module parameters

Referenced by handleTimer().

{
    const char *connectAddress = par("connectAddress");
    int32 connectPort = par("connectPort");
    inStreams = par("inboundStreams");
    outStreams = par("outboundStreams");
    socket.setInboundStreams(inStreams);
    socket.setOutboundStreams(outStreams);
    ev << "issuing OPEN command\n";
    setStatusString("connecting");
    ev<<"connect to address "<<connectAddress<<"\n";
    socket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession"));
    numSessions++;
}
void SCTPClient::finish ( )

Records basic statistics: numSessions, packetsSent, packetsRcvd, bytesSent, bytesRcvd. Redefine to record different or more statistics at the end of the simulation.

{
    if (timeMsg->isScheduled())
        cancelEvent(timeMsg);
    delete timeMsg;
    if (stopTimer)
    {
        cancelEvent(stopTimer);
        delete stopTimer;
    }
    if (primaryChangeTimer)
    {
        cancelEvent(primaryChangeTimer);
        delete primaryChangeTimer;
        primaryChangeTimer = NULL;
    }
    ev << getFullPath() << ": opened " << numSessions << " sessions\n";
    ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
    ev << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n";
    sctpEV3<<"Client finished\n";
}
void SCTPClient::handleMessage ( cMessage *  msg)

For self-messages it invokes handleTimer(); messages arriving from SCTP will get dispatched to the socketXXX() functions.

{
    if (msg->isSelfMessage())
        handleTimer(msg);
    else
    {
        socket.processMessage(PK(msg));
    }
}
void SCTPClient::handleTimer ( cMessage *  msg)

Invoked from handleMessage(). Should be redefined to handle self-messages.

Referenced by handleMessage().

{

    switch (msg->getKind())
    {
        case MSGKIND_CONNECT:
            ev << "starting session call connect\n";
            connect();
            break;
        case MSGKIND_SEND:

            if (((!timer && numRequestsToSend>0) || timer))
            {
                if (sendAllowed)
                {
                    sendRequest();
                    if (!timer)
                        numRequestsToSend--;
                }
                if ((simtime_t)par("thinkTime") > 0)
                    scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
                if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
                {
                    socket.shutdown();
                    if (timeMsg->isScheduled())
                        cancelEvent(timeMsg);
                    if (finishEndsSimulation) {
                        endSimulation();
                    }
                }
            }
            else if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
            {
                    socket.shutdown();
                    if (timeMsg->isScheduled())
                        cancelEvent(timeMsg);
                    if (finishEndsSimulation) {
                        endSimulation();
                    }
            }
            break;
        case MSGKIND_ABORT:
            close();
            break;
        case MSGKIND_PRIMARY:
            setPrimaryPath((const char*)par("newPrimary"));
            break;
        case MSGKIND_STOP:
            numRequestsToSend=0;
            sendAllowed = false;
            socket.abort();
            socket.close();
            if (timeMsg->isScheduled())
                cancelEvent(timeMsg);
            socket.close();
            if (finishEndsSimulation) {
                endSimulation();
            }
            break;
        default:
            ev<<"MsgKind ="<<msg->getKind()<<" unknown\n";
            break;
    }
}
void SCTPClient::initialize ( )

Initialization.

{
    const char * address;
    char* token;
    AddressVector addresses;
    sctpEV3<<"initialize SCTP Client\n";
    numSessions = numBroken = packetsSent = packetsRcvd = bytesSent = echoedBytesSent = bytesRcvd = 0;
    WATCH(numSessions);
    WATCH(numBroken);
    WATCH(packetsSent);
    WATCH(packetsRcvd);
    WATCH(bytesSent);
    WATCH(bytesRcvd);
    // parameters
    address=par("address");

    token = strtok((char*)address,",");
    while (token != NULL)
    {
        addresses.push_back(IPvXAddress(token));
        token = strtok(NULL, ",");
    }
    int32 port = par("port");
    echoFactor = par("echoFactor");
    if (!echoFactor) echoFactor = false;
    ordered = (bool)par("ordered");
    finishEndsSimulation = (bool)par("finishEndsSimulation");
    if (strcmp(address,"")==0)
    {
        socket.bind(port);
    }
    else
    {
        socket.bindx(addresses, port);
    }

    socket.setCallbackObject(this);
    socket.setOutputGate(gate("sctpOut"));
    setStatusString("waiting");

    timeMsg = new cMessage("CliAppTimer");
    numRequestsToSend = 0;
    numPacketsToReceive = 0;
    queueSize = par("queueSize");
    WATCH(numRequestsToSend);
    recordScalar("ums", (uint32) par("requestLength"));
    timeMsg->setKind(MSGKIND_CONNECT);
    scheduleAt((simtime_t)par("startTime"), timeMsg);
    sendAllowed = true;
    bufferSize = 0;
    if ((simtime_t)par("stopTime")!=0)
    {
        stopTimer = new cMessage("StopTimer");
        stopTimer->setKind(MSGKIND_STOP);
        scheduleAt((simtime_t)par("stopTime"), stopTimer);
        timer = true;
    }
    else
    {
        timer = false;
        stopTimer = NULL;
    }
    if ((simtime_t)par("primaryTime")!=0)
    {
        primaryChangeTimer = new cMessage("PrimaryTime");
        primaryChangeTimer->setKind(MSGKIND_PRIMARY);
        scheduleAt((simtime_t)par("primaryTime"), primaryChangeTimer);
    }
    else
    {
        primaryChangeTimer = NULL;
    }
}
void SCTPClient::sendqueueAbatedArrived ( int32  connId,
uint64  buffer 
)
{
    bufferSize = buffer;
    sendAllowed = true;
    while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
                    (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
    {
            if (!timer && numRequestsToSend==1)
                        sendRequest(true);
                    else
                        sendRequest(false);
          if (!timer && (--numRequestsToSend == 0))
                sendAllowed = false;
        }
        if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
            {
                sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
                socket.shutdown();
                if (timeMsg->isScheduled())
                    cancelEvent(timeMsg);
                if (finishEndsSimulation) {
                    endSimulation();
                }
            }
}
void SCTPClient::sendqueueFullArrived ( int32  connId)
{
    sendAllowed = false;
}
void SCTPClient::sendQueueRequest ( )

Referenced by socketEstablished().

{
    cPacket* cmsg = new cPacket("Queue");
    SCTPInfo* qinfo = new SCTPInfo();
    qinfo->setText(queueSize);
    cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
    qinfo->setAssocId(socket.getConnectionId());
    cmsg->setControlInfo(qinfo);
        sctpEV3 << "Sending queue request ..." << endl;
    socket.sendRequest(cmsg);
}
void SCTPClient::sendRequest ( bool  last = true) [protected]

Utility: sends a request to the server

Referenced by handleTimer(), sendqueueAbatedArrived(), sendRequestArrived(), and socketEstablished().

{
    uint32 i, sendBytes;

    sendBytes = par("requestLength");


    if (sendBytes < 1)
        sendBytes=1;
    cPacket* cmsg = new cPacket("AppData");
    SCTPSimpleMessage* msg=new SCTPSimpleMessage("data");

    msg->setDataArraySize(sendBytes);
    for (i=0; i<sendBytes; i++)
    {
        msg->setData(i, 'a');
    }
    msg->setDataLen(sendBytes);
    msg->setByteLength(sendBytes);
    msg->setCreationTime(simulation.getSimTime());
    cmsg->encapsulate(msg);
    if (ordered)
        cmsg->setKind(SCTP_C_SEND_ORDERED);
    else
        cmsg->setKind(SCTP_C_SEND_UNORDERED);
    // send SCTPMessage with SCTPSimpleMessage enclosed
    sctpEV3 << "Sending request ..." << endl;
    bufferSize -= sendBytes;
    if (bufferSize < 0)
        last = true;
    socket.send(cmsg, last);
    bytesSent+=sendBytes;
}
void SCTPClient::sendRequestArrived ( ) [virtual]

Reimplemented from SCTPSocket::CallbackInterface.

{
    int32 count = 0;

    sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n";
    while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize && sendAllowed)
    {
        if (count == queueSize)
            sendRequest();
        else
            sendRequest(false);

        if (!timer)
            numRequestsToSend--;
        if ((!timer && numRequestsToSend == 0))
        {
            sctpEV3<<"no more packets to send, call shutdown\n";
            socket.shutdown();
            if (timeMsg->isScheduled())
                cancelEvent(timeMsg);
            if (finishEndsSimulation) {
                endSimulation();
            }
        }
    }
}
void SCTPClient::setAssociation ( SCTPAssociation _assoc) [inline]
{assoc = _assoc;};
void SCTPClient::setPrimaryPath ( const char *  addr)

Referenced by handleTimer().

{

    cPacket* cmsg = new cPacket("CMSG-SetPrimary");
    SCTPPathInfo *pinfo = new SCTPPathInfo();
    if (strcmp(str,"")!=0)
    {
        pinfo->setRemoteAddress(IPvXAddress(str));
    }
    else
    {
        str = (const char*)par("newPrimary");
        if (strcmp(str, "")!=0)
            pinfo->setRemoteAddress(IPvXAddress(str));
        else
        {
            str = (const char*)par("connectAddress");
            pinfo->setRemoteAddress(IPvXAddress(str));
        }
    }

    pinfo->setAssocId(socket.getConnectionId());
    cmsg->setKind(SCTP_C_PRIMARY);
    cmsg->setControlInfo(pinfo);
    socket.sendNotification(cmsg);
}
void SCTPClient::setStatusString ( const char *  s)

Sends a GenericAppMsg of the given length When running under GUI, it displays the given string next to the icon

Referenced by close(), connect(), initialize(), socketClosed(), socketEstablished(), and socketFailure().

{
    if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s);
}
void SCTPClient::shutdownReceivedArrived ( int32  connId)
{
    if (numRequestsToSend==0)
    {
        cPacket* cmsg = new cPacket("Request");
        SCTPInfo* qinfo = new SCTPInfo();
        cmsg->setKind(SCTP_C_NO_OUTSTANDING);
        qinfo->setAssocId(connId);
        cmsg->setControlInfo(qinfo);
        socket.sendNotification(cmsg);
    }
}
void SCTPClient::socketClosed ( int32  connId,
void *  yourPtr 
)

Does nothing but update statistics/status. Redefine if you want to do something else, such as opening a new connection.

{
    // *redefine* to start another session etc.
    ev << "connection closed\n";
    setStatusString("closed");
    if (primaryChangeTimer)
    {
        cancelEvent(primaryChangeTimer);
        delete primaryChangeTimer;
        primaryChangeTimer = NULL;
    }
}
void SCTPClient::socketDataArrived ( int32  connId,
void *  yourPtr,
cPacket *  msg,
bool  urgent 
)

Does nothing but update statistics/status. Redefine to perform or schedule next sending. Beware: this funcion deletes the incoming message, which might not be what you want.

{
    packetsRcvd++;
    sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n";
    SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->removeControlInfo());
    bytesRcvd+=msg->getByteLength();
    if (echoFactor > 0)
    {
        SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
        cPacket* cmsg = new cPacket("SVData");
        echoedBytesSent+=smsg->getBitLength()/8;
        cmsg->encapsulate(smsg);
        if (ind->getSendUnordered())
            cmsg->setKind(SCTP_C_SEND_UNORDERED);
        else
            cmsg->setKind(SCTP_C_SEND_ORDERED);
        packetsSent++;
        delete msg;
        socket.send(cmsg, 1);
    }
    if ((long)par("numPacketsToReceive")>0)
    {
        numPacketsToReceive--;
        if (numPacketsToReceive == 0)
        {
            close();
        }
    }
    delete ind;
}
void SCTPClient::socketDataNotificationArrived ( int32  connId,
void *  yourPtr,
cPacket *  msg 
)
{
    SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
    cPacket* cmsg = new cPacket("CMSG-DataArr");
    SCTPSendCommand *cmd = new SCTPSendCommand();
    cmd->setAssocId(ind->getAssocId());
    cmd->setSid(ind->getSid());
    cmd->setNumMsgs(ind->getNumMsgs());
    cmsg->setKind(SCTP_C_RECEIVE);
    cmsg->setControlInfo(cmd);
    delete ind;
    socket.sendNotification(cmsg);
}
void SCTPClient::socketEstablished ( int32  connId,
void *  yourPtr,
uint64  buffer 
)

Does nothing but update statistics/status. Redefine to perform or schedule first sending.

{
      int32 count = 0;
     ev<<"SCTPClient: connected\n";
    setStatusString("connected");
    bufferSize = buffer;
    // determine number of requests in this session
    numRequestsToSend = (long) par("numRequestsPerSession");
    numPacketsToReceive = (long) par("numPacketsToReceive");
    if (numRequestsToSend<1)
        numRequestsToSend = 0;
        sctpEV3<<"SCTPClient:numRequestsToSend="<<numRequestsToSend<<"\n";
    // perform first request (next one will be sent when reply arrives)
    if ((numRequestsToSend>0 && !timer) || timer)
    {
        if ((simtime_t)par("thinkTime") > 0)
        {
            if (sendAllowed)
            {
                sendRequest();
                if (!timer)
                    numRequestsToSend--;
            }
            timeMsg->setKind(MSGKIND_SEND);
            scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
        }
        else
        {
            if (queueSize>0)
            {
                while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize*2 && sendAllowed)
                {
                    if (count == queueSize*2)
                        sendRequest();
                    else
                        sendRequest(false);
                    if (!timer)
                    {
                        if (--numRequestsToSend == 0)
                            sendAllowed = false;
                    }
                }
                if (((!timer && numRequestsToSend>0) || timer) && sendAllowed)
                    sendQueueRequest();
            }
            else
            {
                while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
                    (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
                {
                    if (!timer && numRequestsToSend==1)
                        sendRequest(true);
                    else
                        sendRequest(false);
                    if (!timer && (--numRequestsToSend == 0))
                            sendAllowed = false;
                    }
                }
            }
            if ((!timer && numPacketsToReceive == 0) && (simtime_t)par("waitToClose")>0)
            {
                timeMsg->setKind(MSGKIND_ABORT);
                scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg);
            }
            if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
            {
                sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
                socket.shutdown();
                if (timeMsg->isScheduled())
                    cancelEvent(timeMsg);
                if (finishEndsSimulation) {
                    endSimulation();
                }
            }
        }
}
void SCTPClient::socketFailure ( int32  connId,
void *  yourPtr,
int32  code 
)

Does nothing but update statistics/status. Redefine if you want to try reconnecting after a delay.

{
    // subclasses may override this function, and add code try to reconnect after a delay.
    ev << "connection broken\n";
    setStatusString("broken");
    numBroken++;
    // reconnect after a delay
    timeMsg->setKind(MSGKIND_CONNECT);
    scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg);
}
void SCTPClient::socketPeerClosed ( int32  connId,
void *  yourPtr 
)

Since remote SCTP closed, invokes close(). Redefine if you want to do something else.

{
    // close the connection (if not already closed)
    if (socket.getState()==SCTPSocket::PEER_CLOSED)
    {
        ev << "remote SCTP closed, closing here as well\n";
        close();
    }
}
void SCTPClient::socketStatusArrived ( int32  connId,
void *  yourPtr,
SCTPStatusInfo *  status 
)

Redefine to handle incoming SCTPStatusInfo.

{
struct pathStatus ps;
    SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId());
    if (i!=sctpPathStatus.end())
    {
        ps = i->second;
        ps.active=status->getActive();
    }
    else
    {
        ps.active = status->getActive();
        ps.pid = status->getPathId();
        ps.primaryPath = false;
        sctpPathStatus[ps.pid]=ps;
    }
}

Member Data Documentation

uint64 SCTPClient::bytesRcvd [protected]
uint64 SCTPClient::bytesSent [protected]

Referenced by finish(), initialize(), and sendRequest().

uint64 SCTPClient::echoedBytesSent [protected]

Referenced by initialize(), and socketDataArrived().

int32 SCTPClient::echoFactor [protected]

Referenced by initialize(), and socketDataArrived().

uint32 SCTPClient::inStreams [protected]

Referenced by connect().

int32 SCTPClient::numBroken [protected]

Referenced by initialize(), and socketFailure().

uint32 SCTPClient::numBytes [protected]
int32 SCTPClient::numSessions [protected]

Referenced by connect(), finish(), and initialize().

bool SCTPClient::ordered [protected]

Referenced by initialize(), and sendRequest().

uint32 SCTPClient::outStreams [protected]

Referenced by connect().

uint64 SCTPClient::packetsRcvd [protected]
uint64 SCTPClient::packetsSent [protected]
cMessage* SCTPClient::primaryChangeTimer [protected]

Referenced by finish(), initialize(), and socketClosed().

cMessage* SCTPClient::stopTimer [protected]

Referenced by finish(), and initialize().


The documentation for this class was generated from the following files: