|
INET Framework for OMNeT++/OMNEST
|
#include <SCTPAssociation.h>
Classes | |
| struct | calcBytesToSend |
| struct | congestionControlFunctions |
| struct | counter |
| struct | streamSchedulingFunctions |
Public Member Functions | |
| SCTPAssociation (SCTP *mod, int32 appGateIndex, int32 assocId) | |
| ~SCTPAssociation () | |
| void | sendOnPath (SCTPPathVariables *pathId, const bool firstPass=true) |
| void | sendOnAllPaths (SCTPPathVariables *firstPath) |
Static Public Member Functions | |
| static const char * | indicationName (const int32 code) |
Public Attributes | |
| int32 | appGateIndex |
| int32 | assocId |
| IPvXAddress | remoteAddr |
| IPvXAddress | localAddr |
| uint16 | localPort |
| uint16 | remotePort |
| uint32 | localVTag |
| uint32 | peerVTag |
| bool | listen |
| cMessage * | T1_InitTimer |
| cMessage * | T2_ShutdownTimer |
| cMessage * | T5_ShutdownGuardTimer |
| cMessage * | SackTimer |
| cMessage * | StartTesting |
Protected Types | |
| typedef struct SCTPAssociation::streamSchedulingFunctions | SSFunctions |
Protected Member Functions | |
| void | startTimer (cMessage *timer, const simtime_t &timeout) |
| SCTPAssociation * | cloneAssociation () |
| void | initAssociation (SCTPOpenCommand *openCmd) |
| bool | tsnIsDuplicate (const uint32 tsn) const |
| bool | advanceCtsna () |
| bool | updateGapList (const uint32 tsn) |
| void | removeFromGapList (const uint32 removedTsn) |
| bool | makeRoomForTsn (const uint32 tsn, const uint32 length, const bool uBit) |
| void | sendInit () |
| void | sendInitAck (SCTPInitChunk *initchunk) |
| void | sendCookieEcho (SCTPInitAckChunk *initackchunk) |
| void | sendCookieAck (const IPvXAddress &dest) |
| void | sendAbort () |
| void | sendHeartbeat (const SCTPPathVariables *path) |
| void | sendHeartbeatAck (const SCTPHeartbeatChunk *heartbeatChunk, const IPvXAddress &src, const IPvXAddress &dest) |
| void | sendSack () |
| void | sendShutdown () |
| void | sendShutdownAck (const IPvXAddress &dest) |
| void | sendShutdownComplete () |
| SCTPSackChunk * | createSack () |
| void | retransmitInit () |
| void | retransmitCookieEcho () |
| void | retransmitShutdown () |
| void | retransmitShutdownAck () |
| void | sendToIP (SCTPMessage *sctpmsg, const IPvXAddress &dest, const bool qs=false) |
| void | sendToIP (SCTPMessage *sctpmsg, const bool qs=false) |
| void | recordInPathVectors (SCTPMessage *pMsg, const IPvXAddress &rDest) |
| void | scheduleSack () |
| void | signalConnectionTimeout () |
| void | scheduleTimeout (cMessage *msg, const simtime_t &timeout) |
| cMessage * | cancelEvent (cMessage *msg) |
| void | sendToApp (cPacket *msg) |
| void | sendIndicationToApp (const int32 code, const int32 value=0) |
| void | sendEstabIndicationToApp () |
| void | pushUlp () |
| void | sendDataArrivedNotification (uint16 sid) |
| void | putInDeliveryQ (uint16 sid) |
| void | printConnBrief () |
| void | addPath (const IPvXAddress &addr) |
| SCTPPathVariables * | getNextPath (const SCTPPathVariables *oldPath) const |
| const IPvXAddress & | getNextAddress (const SCTPPathVariables *oldPath) const |
| SCTPPathVariables * | getNextDestination (SCTPDataVariables *chunk) const |
| void | bytesAllowedToSend (SCTPPathVariables *path, const bool firstPass) |
| void | pathStatusIndication (const SCTPPathVariables *path, const bool status) |
| bool | allPathsInactive () const |
| SCTPDataChunk * | transformDataChunk (SCTPDataVariables *chunk) |
| SCTPDataVariables * | makeVarFromMsg (SCTPDataChunk *datachunk) |
| int32 | streamScheduler (bool peek) |
| void | initStreams (uint32 inStreams, uint32 outStreams) |
| int32 | numUsableStreams () |
| void | process_QUEUE_MSGS_LIMIT (const SCTPCommand *sctpCommand) |
| void | process_QUEUE_BYTES_LIMIT (const SCTPCommand *sctpCommand) |
| int32 | getOutstandingBytes () const |
| uint32 | dequeueAckedChunks (const uint32 tsna, SCTPPathVariables *path, simtime_t &rttEstimation) |
| SCTPDataMsg * | peekOutboundDataMsg () |
| SCTPDataVariables * | peekAbandonedChunk (const SCTPPathVariables *path) |
| SCTPDataVariables * | getOutboundDataChunk (const SCTPPathVariables *path, const int32 availableSpace, const int32 availableCwnd) |
| SCTPDataMsg * | dequeueOutboundDataMsg (const int32 availableSpace, const int32 availableCwnd) |
| bool | nextChunkFitsIntoPacket (int32 bytes) |
| void | putInTransmissionQ (uint32 tsn, SCTPDataVariables *chunk) |
| void | pmStartPathManagement () |
| void | pmDataIsSentOn (SCTPPathVariables *path) |
| void | pmClearPathCounter (SCTPPathVariables *path) |
| void | pmRttMeasurement (SCTPPathVariables *path, const simtime_t &rttEstimation) |
| void | disposeOf (SCTPMessage *sctpmsg) |
| void | tsnWasReneged (SCTPDataVariables *chunk, const int type) |
| void | printOutstandingTsns () |
| void | initCCParameters (SCTPPathVariables *path) |
| void | updateFastRecoveryStatus (const uint32 lastTsnAck) |
| void | cwndUpdateAfterSack () |
| void | cwndUpdateAfterCwndTimeout (SCTPPathVariables *path) |
| void | cwndUpdateAfterRtxTimeout (SCTPPathVariables *path) |
| void | cwndUpdateMaxBurst (SCTPPathVariables *path) |
| void | cwndUpdateBytesAcked (SCTPPathVariables *path, const uint32 ackedBytes, const bool ctsnaAdvanced) |
FSM transitions: analysing events and executing state transitions | |
| SCTPEventCode | preanalyseAppCommandEvent (int32 commandCode) |
| bool | performStateTransition (const SCTPEventCode &event) |
| void | stateEntered (int32 state) |
Processing app commands. Invoked from processAppCommand(). | |
| void | process_ASSOCIATE (SCTPEventCode &event, SCTPCommand *sctpCommand, cPacket *msg) |
| void | process_OPEN_PASSIVE (SCTPEventCode &event, SCTPCommand *sctpCommand, cPacket *msg) |
| void | process_SEND (SCTPEventCode &event, SCTPCommand *sctpCommand, cPacket *msg) |
| void | process_CLOSE (SCTPEventCode &event) |
| void | process_ABORT (SCTPEventCode &event) |
| void | process_STATUS (SCTPEventCode &event, SCTPCommand *sctpCommand, cPacket *msg) |
| void | process_RECEIVE_REQUEST (SCTPEventCode &event, SCTPCommand *sctpCommand) |
| void | process_PRIMARY (SCTPEventCode &event, SCTPCommand *sctpCommand) |
Processing SCTP message arrivals. Invoked from processSCTPMessage(). | |
| bool | process_RCV_Message (SCTPMessage *sctpseg, const IPvXAddress &src, const IPvXAddress &dest) |
| bool | processInitArrived (SCTPInitChunk *initChunk, int32 sport, int32 dport) |
| bool | processInitAckArrived (SCTPInitAckChunk *initAckChunk) |
| bool | processCookieEchoArrived (SCTPCookieEchoChunk *cookieEcho, IPvXAddress addr) |
| bool | processCookieAckArrived () |
| SCTPEventCode | processDataArrived (SCTPDataChunk *dataChunk) |
| SCTPEventCode | processSackArrived (SCTPSackChunk *sackChunk) |
| SCTPEventCode | processHeartbeatAckArrived (SCTPHeartbeatAckChunk *heartbeatack, SCTPPathVariables *path) |
Processing timeouts. Invoked from processTimer(). | |
| int32 | process_TIMEOUT_RTX (SCTPPathVariables *path) |
| void | process_TIMEOUT_HEARTBEAT (SCTPPathVariables *path) |
| void | process_TIMEOUT_HEARTBEAT_INTERVAL (SCTPPathVariables *path, bool force) |
| void | process_TIMEOUT_INIT_REXMIT (SCTPEventCode &event) |
| void | process_TIMEOUT_PROBING () |
| void | process_TIMEOUT_SHUTDOWN (SCTPEventCode &event) |
| int32 | updateCounters (SCTPPathVariables *path) |
Static Protected Member Functions | |
| static void | printSegmentBrief (SCTPMessage *sctpmsg) |
| static const char * | eventName (const int32 event) |
| static int32 | tsnLt (const uint32 tsn1, const uint32 tsn2) |
| static int32 | tsnLe (const uint32 tsn1, const uint32 tsn2) |
| static int32 | tsnGe (const uint32 tsn1, const uint32 tsn2) |
| static int32 | tsnGt (const uint32 tsn1, const uint32 tsn2) |
| static int32 | tsnBetween (const uint32 tsn1, const uint32 midtsn, const uint32 tsn2) |
| static int16 | ssnGt (const uint16 ssn1, const uint16 ssn2) |
Protected Attributes | |
| AddressVector | localAddressList |
| AddressVector | remoteAddressList |
| uint32 | numberOfRemoteAddresses |
| uint32 | inboundStreams |
| uint32 | outboundStreams |
| int32 | status |
| uint32 | initTsn |
| uint32 | initPeerTsn |
| uint32 | sackFrequency |
| double | sackPeriod |
| CCFunctions | ccFunctions |
| uint16 | ccModule |
| cOutVector * | advRwnd |
| cOutVector * | cumTsnAck |
| cOutVector * | sendQueue |
| cOutVector * | numGapBlocks |
| SCTPStateVariables * | state |
| BytesToBeSent | bytes |
| SCTP * | sctpMain |
| cFSM * | fsm |
| SCTPPathMap | sctpPathMap |
| QueueCounter | qCounter |
| SCTPQueue * | transmissionQ |
| SCTPQueue * | retransmissionQ |
| SCTPSendStreamMap | sendStreams |
| SCTPReceiveStreamMap | receiveStreams |
| SCTPAlgorithm * | sctpAlgorithm |
| SSFunctions | ssFunctions |
| uint16 | ssModule |
Private Types | |
| typedef std::map< IPvXAddress, SCTPPathVariables * > | SCTPPathMap |
| typedef std::map< IPvXAddress, uint32 > | CounterMap |
| typedef struct SCTPAssociation::counter | QueueCounter |
| typedef struct SCTPAssociation::calcBytesToSend | BytesToBeSent |
| typedef struct SCTPAssociation::congestionControlFunctions | CCFunctions |
| typedef std::map< uint32, SCTPSendStream * > | SCTPSendStreamMap |
| typedef std::map< uint32, SCTPReceiveStream * > | SCTPReceiveStreamMap |
Private Member Functions | |
| SCTPDataVariables * | makeDataVarFromDataMsg (SCTPDataMsg *datMsg, SCTPPathVariables *path) |
| SCTPPathVariables * | choosePathForRetransmission () |
| void | timeForSack (bool &sackOnly, bool &sackWithData) |
| void | recordCwndUpdate (SCTPPathVariables *path) |
| void | handleChunkReportedAsAcked (uint32 &highestNewAck, simtime_t &rttEstimation, SCTPDataVariables *myChunk, SCTPPathVariables *sackPath) |
| void | handleChunkReportedAsMissing (const SCTPSackChunk *sackChunk, const uint32 highestNewAck, SCTPDataVariables *myChunk, const SCTPPathVariables *sackPath) |
| void | moveChunkToOtherPath (SCTPDataVariables *chunk, SCTPPathVariables *newPath) |
| void | decreaseOutstandingBytes (SCTPDataVariables *chunk) |
| void | increaseOutstandingBytes (SCTPDataVariables *chunk, SCTPPathVariables *path) |
| int32 | calculateBytesToSendOnPath (const SCTPPathVariables *pathVar) |
| void | storePacket (SCTPPathVariables *pathVar, SCTPMessage *sctpMsg, const uint16 chunksAdded, const uint16 dataChunksAdded, const uint32 packetBytes, const bool authAdded) |
| void | loadPacket (SCTPPathVariables *pathVar, SCTPMessage **sctpMsg, uint16 *chunksAdded, uint16 *dataChunksAdded, uint32 *packetBytes, bool *authAdded) |
| void | ackChunk (SCTPDataVariables *chunk) |
| void | unackChunk (SCTPDataVariables *chunk) |
| bool | chunkHasBeenAcked (const SCTPDataVariables *chunk) const |
| bool | chunkHasBeenAcked (const uint32 tsn) const |
Friends | |
| class | SCTP |
| class | SCTPPathVariables |
| int32 | getFsmState () const |
| SCTPStateVariables * | getState () const |
| SCTPQueue * | getTransmissionQueue () const |
| SCTPQueue * | getRetransmissionQueue () const |
| SCTPAlgorithm * | getSctpAlgorithm () const |
| SCTP * | getSctpMain () const |
| cFSM * | getFsm () const |
| cMessage * | getInitTimer () const |
| cMessage * | getShutdownTimer () const |
| cMessage * | getSackTimer () const |
| bool | processTimer (cMessage *msg) |
| bool | processSCTPMessage (SCTPMessage *sctpmsg, const IPvXAddress &srcAddr, const IPvXAddress &destAddr) |
| bool | processAppCommand (cPacket *msg) |
| void | removePath () |
| void | removePath (const IPvXAddress &addr) |
| void | deleteStreams () |
| void | stopTimer (cMessage *timer) |
| void | stopTimers () |
| SCTPPathVariables * | getPath (const IPvXAddress &pathId) const |
| void | printSctpPathMap () const |
| static const char * | stateName (const int32 state) |
| static uint32 | chunkToInt (const char *type) |
typedef struct SCTPAssociation::calcBytesToSend SCTPAssociation::BytesToBeSent [private] |
typedef struct SCTPAssociation::congestionControlFunctions SCTPAssociation::CCFunctions [private] |
typedef std::map<IPvXAddress, uint32> SCTPAssociation::CounterMap [private] |
typedef struct SCTPAssociation::counter SCTPAssociation::QueueCounter [private] |
typedef std::map<IPvXAddress,SCTPPathVariables*> SCTPAssociation::SCTPPathMap [private] |
typedef std::map<uint32, SCTPReceiveStream*> SCTPAssociation::SCTPReceiveStreamMap [private] |
typedef std::map<uint32, SCTPSendStream*> SCTPAssociation::SCTPSendStreamMap [private] |
typedef struct SCTPAssociation::streamSchedulingFunctions SCTPAssociation::SSFunctions [protected] |
| SCTPAssociation::SCTPAssociation | ( | SCTP * | mod, |
| int32 | appGateIndex, | ||
| int32 | assocId | ||
| ) |
Constructor.
Referenced by cloneAssociation().
{
// ====== Initialize variables ===========================================
sctpMain = _module;
appGateIndex = _appGateIndex;
assocId = _assocId;
localPort = 0;
remotePort = 0;
localVTag = 0;
peerVTag = 0;
numberOfRemoteAddresses = 0;
inboundStreams = SCTP_DEFAULT_INBOUND_STREAMS;
outboundStreams = SCTP_DEFAULT_OUTBOUND_STREAMS;
// queues and algorithm will be created on active or passive open
transmissionQ = NULL;
retransmissionQ = NULL;
sctpAlgorithm = NULL;
state = NULL;
sackPeriod = SACK_DELAY;
/*
totalCwndAdjustmentTime = simTime();
lastTotalSSthresh = ~0;
lastTotalCwnd = ~0;*/
cumTsnAck = NULL;
sendQueue = NULL;
numGapBlocks = NULL;
qCounter.roomSumSendStreams = 0;
qCounter.bookedSumSendStreams = 0;
qCounter.roomSumRcvStreams = 0;
bytes.chunk = false;
bytes.packet = false;
bytes.bytesToSend = 0;
sctpEV3 << "SCTPAssociationBase::SCTPAssociation(): new assocId="
<< assocId << endl;
// ====== FSM ============================================================
char fsmName[64];
snprintf(fsmName, sizeof(fsmName), "fsm-%d", assocId);
fsm = new cFSM();
fsm->setName(fsmName);
fsm->setState(SCTP_S_CLOSED);
// ====== Path Info ======================================================
SCTPPathInfo* pinfo = new SCTPPathInfo("pathInfo");
pinfo->setRemoteAddress(IPvXAddress("0.0.0.0"));
// ====== Timers =========================================================
char timerName[128];
snprintf(timerName, sizeof(timerName), "T1_INIT of Association %d", assocId);
T1_InitTimer = new cMessage(timerName);
snprintf(timerName, sizeof(timerName), "T2_SHUTDOWN of Association %d", assocId);
T2_ShutdownTimer = new cMessage(timerName);
snprintf(timerName, sizeof(timerName), "T5_SHUTDOWN_GUARD of Association %d", assocId);
T5_ShutdownGuardTimer = new cMessage(timerName);
snprintf(timerName, sizeof(timerName), "SACK_TIMER of Association %d", assocId);
SackTimer = new cMessage(timerName);
if (sctpMain->testTimeout > 0){
StartTesting = new cMessage("StartTesting");
StartTesting->setContextPointer(this);
StartTesting->setControlInfo(pinfo->dup());
scheduleTimeout(StartTesting, sctpMain->testTimeout);
}
T1_InitTimer->setContextPointer(this);
T2_ShutdownTimer->setContextPointer(this);
SackTimer->setContextPointer(this);
T5_ShutdownGuardTimer->setContextPointer(this);
T1_InitTimer->setControlInfo(pinfo);
T2_ShutdownTimer->setControlInfo(pinfo->dup());
SackTimer->setControlInfo(pinfo->dup());
T5_ShutdownGuardTimer->setControlInfo(pinfo->dup());
// ====== Output vectors =================================================
char vectorName[128];
snprintf(vectorName, sizeof(vectorName), "Advertised Receiver Window %d", assocId);
advRwnd = new cOutVector(vectorName);
// ====== Stream scheduling ==============================================
ssModule = sctpMain->par("ssModule");
switch (ssModule)
{
case ROUND_ROBIN:
ssFunctions.ssInitStreams = &SCTPAssociation::initStreams;
ssFunctions.ssGetNextSid = &SCTPAssociation::streamScheduler;
ssFunctions.ssUsableStreams = &SCTPAssociation::numUsableStreams;
break;
}
}
| SCTPAssociation::~SCTPAssociation | ( | ) |
Destructor.
{
sctpEV3 << "Destructor SCTPAssociation" << endl;
delete T1_InitTimer;
delete T2_ShutdownTimer;
delete T5_ShutdownGuardTimer;
delete SackTimer;
delete advRwnd;
delete cumTsnAck;
delete numGapBlocks;
delete sendQueue;
delete fsm;
delete state;
delete sctpAlgorithm;
}
| void SCTPAssociation::ackChunk | ( | SCTPDataVariables * | chunk | ) | [inline, private] |
Referenced by dequeueAckedChunks(), and handleChunkReportedAsAcked().
{
chunk->hasBeenAcked = true;
}
| void SCTPAssociation::addPath | ( | const IPvXAddress & | addr | ) | [protected] |
Referenced by processInitAckArrived().
{
sctpEV3<<"Add Path remote address: "<<addr<<"\n";
SCTPPathMap::iterator i = sctpPathMap.find(addr);
if (i==sctpPathMap.end())
{
sctpEV3<<__LINE__<<" get new path for "<<addr<<"\n";
SCTPPathVariables* path = new SCTPPathVariables(addr, this);
sctpPathMap[addr] = path;
qCounter.roomTransQ[addr] = 0;
qCounter.bookedTransQ[addr] = 0;
qCounter.roomRetransQ[addr] = 0;
}
sctpEV3<<"path added\n";
}
| bool SCTPAssociation::advanceCtsna | ( | ) | [protected] |
Referenced by processDataArrived().
{
int32 listLength, counter;
ev<<"Entering advanceCtsna(ctsna now =="<< state->cTsnAck<<"\n";;
listLength = state->numGaps;
/* if there are no fragments, we cannot advance the ctsna */
if (listLength == 0) return false;
counter = 0;
while(counter < listLength)
{
/* if we take out a fragment here, we need to modify either counter or list_length */
if (state->cTsnAck + 1 == state->gapStartList[0])
{
/* BINGO ! */
state->cTsnAck = state->gapStopList[0];
/* we can take out a maximum of list_length fragments */
counter++;
for (uint32 i=1; i<state->numGaps; i++)
{
state->gapStartList[i-1] = state->gapStartList[i];
state->gapStopList[i-1] = state->gapStopList[i];
}
}
else
{
ev<<"Entering advanceCtsna(when leaving: ctsna=="<<state->cTsnAck<<"\n";
return false;
}
} /* end while */
ev<<"Entering advanceCtsna(when leaving: ctsna=="<< state->cTsnAck<<"\n";
return true;
}
| bool SCTPAssociation::allPathsInactive | ( | ) | const [protected] |
Referenced by process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_RTX(), and updateCounters().
{
for(SCTPPathMap::const_iterator it = sctpPathMap.begin(); it != sctpPathMap.end(); it++) {
if (it->second->activePath) {
return false;
}
}
return true;
}
| void SCTPAssociation::bytesAllowedToSend | ( | SCTPPathVariables * | path, |
| const bool | firstPass | ||
| ) | [protected] |
Referenced by sendOnPath().
{
assert(path != NULL);
bytes.chunk = false;
bytes.packet = false;
bytes.bytesToSend = 0;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "):"
<< " osb=" << path->outstandingBytes << " cwnd=" << path->cwnd << endl;
// ====== First transmission =============================================
if (!state->firstDataSent) {
bytes.chunk = true;
}
// ====== Transmission allowed by peer's receiver window? ================
else if (state->peerWindowFull)
{
if (path->outstandingBytes == 0) {
// Zero Window Probing
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): zeroWindowProbing" << endl;
state->zeroWindowProbing = true;
bytes.chunk = true;
}
}
// ====== Retransmissions ================================================
else {
CounterMap::const_iterator it = qCounter.roomTransQ.find(path->remoteAddress);
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): bytes in transQ=" << it->second << endl;
if (it->second > 0) {
const int32 allowance = path->cwnd - path->outstandingBytes;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): cwnd-osb=" << allowance << endl;
if (state->peerRwnd < path->pmtu) {
bytes.bytesToSend = state->peerRwnd;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): rwnd<pmtu" << endl;
return;
}
else if (allowance > 0) {
CounterMap::const_iterator bit = qCounter.bookedTransQ.find(path->remoteAddress);
if (bit->second > (uint32)allowance) {
bytes.bytesToSend = allowance;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): cwnd does not allow all RTX" << endl;
return; // More bytes available than allowed -> just return what is allowed.
}
else {
bytes.bytesToSend = bit->second;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): cwnd allows more than those "
<< bytes.bytesToSend << " bytes for retransmission" << endl;
}
}
else { // You may retransmit one packet
bytes.packet = true;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): allowance<=0: retransmit one packet" << endl;
}
}
// ====== New transmissions ===========================================
if (!bytes.chunk && !bytes.packet) {
if ((path->outstandingBytes < path->cwnd) &&
(!state->peerWindowFull)) {
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "):"
<< " bookedSumSendStreams=" << qCounter.bookedSumSendStreams
<< " bytes.bytesToSend=" << bytes.bytesToSend << endl;
const int32 allowance = path->cwnd - path->outstandingBytes - bytes.bytesToSend;
if (allowance > 0) {
if (qCounter.bookedSumSendStreams > (uint32)allowance) {
bytes.bytesToSend = path->cwnd - path->outstandingBytes;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): bytesToSend are limited by cwnd: "
<< bytes.bytesToSend << endl;
}
else {
bytes.bytesToSend += qCounter.bookedSumSendStreams;
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "): send all stored bytes: "
<< bytes.bytesToSend << endl;
}
}
}
}
}
sctpEV3 << "bytesAllowedToSend(" << path->remoteAddress << "):"
<< " osb=" << path->outstandingBytes
<< " cwnd=" << path->cwnd
<< " bytes.packet=" << (bytes.packet ? "YES!" : "no")
<< " bytes.chunk=" << (bytes.chunk ? "YES!" : "no")
<< " bytes.bytesToSend=" << bytes.bytesToSend << endl;
}
| int32 SCTPAssociation::calculateBytesToSendOnPath | ( | const SCTPPathVariables * | pathVar | ) | [private] |
{
int32 bytesToSend;
const SCTPDataMsg* datMsg = peekOutboundDataMsg();
if(datMsg != NULL) {
const uint32 ums = datMsg->getBooksize(); // Get user message size
const uint32 num = (uint32)floor((double)(pathVar->pmtu - 32) / (ums + SCTP_DATA_CHUNK_LENGTH));
if (num * ums > state->peerRwnd) {
// Receiver cannot handle data yet
bytesToSend = 0;
}
else {
// Receiver will accept data
bytesToSend = num * ums;
}
}
else {
bytesToSend = 0;
}
return(bytesToSend);
}
| cMessage* SCTPAssociation::cancelEvent | ( | cMessage * | msg | ) | [inline, protected] |
| SCTPPathVariables * SCTPAssociation::choosePathForRetransmission | ( | ) | [private] |
Referenced by sendOnPath().
{
uint32 max = 0;
SCTPPathVariables* temp = NULL;
for (SCTPPathMap::iterator iterator = sctpPathMap.begin(); iterator != sctpPathMap.end(); ++iterator) {
SCTPPathVariables* path = iterator->second;
CounterMap::const_iterator tq = qCounter.roomTransQ.find(path->remoteAddress);
if ((tq != qCounter.roomTransQ.end()) && (tq->second > max)) {
max = tq->second;
temp = path;
}
}
return temp;
}
| bool SCTPAssociation::chunkHasBeenAcked | ( | const SCTPDataVariables * | chunk | ) | const [inline, private] |
Referenced by cwndUpdateAfterSack(), dequeueAckedChunks(), getOutboundDataChunk(), handleChunkReportedAsMissing(), process_TIMEOUT_RTX(), processSackArrived(), and sendOnPath().
{
return(chunk->hasBeenAcked);
}
| bool SCTPAssociation::chunkHasBeenAcked | ( | const uint32 | tsn | ) | const [inline, private] |
{
const SCTPDataVariables* chunk = retransmissionQ->getChunk(tsn);
if(chunk) {
return(chunkHasBeenAcked(chunk));
}
return(false);
}
| uint32 SCTPAssociation::chunkToInt | ( | const char * | type | ) | [static] |
{
if (strcmp(type, "DATA")==0) return 0;
if (strcmp(type, "INIT")==0) return 1;
if (strcmp(type, "INIT_ACK")==0) return 2;
if (strcmp(type, "SACK")==0) return 3;
if (strcmp(type, "HEARTBEAT")==0) return 4;
if (strcmp(type, "HEARTBEAT_ACK")==0) return 5;
if (strcmp(type, "ABORT")==0) return 6;
if (strcmp(type, "SHUTDOWN")==0) return 7;
if (strcmp(type, "SHUTDOWN_ACK")==0) return 8;
if (strcmp(type, "ERRORTYPE")==0) return 9;
if (strcmp(type, "COOKIE_ECHO")==0) return 10;
if (strcmp(type, "COOKIE_ACK")==0) return 11;
if (strcmp(type, "SHUTDOWN_COMPLETE")==0) return 14;
sctpEV3<<"ChunkConversion not successful\n";
return 0;
}
| SCTPAssociation * SCTPAssociation::cloneAssociation | ( | ) | [protected] |
Utility: clone a listening association. Used for forking.
Referenced by processInitArrived().
{
SCTPAssociation* assoc = new SCTPAssociation(sctpMain,appGateIndex,assocId);
const char* queueClass = transmissionQ->getClassName();
assoc->transmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
assoc->retransmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
const char* sctpAlgorithmClass = sctpAlgorithm->getClassName();
assoc->sctpAlgorithm = check_and_cast<SCTPAlgorithm *>(createOne(sctpAlgorithmClass));
assoc->sctpAlgorithm->setAssociation(assoc);
assoc->sctpAlgorithm->initialize();
assoc->state = assoc->sctpAlgorithm->createStateVariables();
assoc->state->active = false;
assoc->state->fork = true;
assoc->localAddr = localAddr;
assoc->localPort = localPort;
assoc->localAddressList = localAddressList;
FSM_Goto((*assoc->fsm), SCTP_S_CLOSED);
sctpMain->printInfoConnMap();
return assoc;
}
| SCTPSackChunk * SCTPAssociation::createSack | ( | ) | [protected] |
Referenced by sendOnPath(), and sendSack().
{
uint32 key=0, arwnd=0;
sctpEV3<<"SCTPAssociationUtil:createSACK localAddress="<<localAddr<<" remoteAddress="<<remoteAddr<<"\n";
sctpEV3<<" localRwnd="<<state->localRwnd<<" queuedBytes="<<state->queuedReceivedBytes<<"\n";
if ((int32)(state->localRwnd - state->queuedReceivedBytes) <= 0)
{
arwnd = 0;
if (state->swsLimit > 0)
state->swsAvoidanceInvoked = true;
}
else if (state->localRwnd - state->queuedReceivedBytes < state->swsLimit || state->swsAvoidanceInvoked == true)
{
arwnd = 1;
if (state->swsLimit > 0)
state->swsAvoidanceInvoked = true;
sctpEV3<<"arwnd=1; createSack : SWS Avoidance ACTIVE !!!\n";
}
else
{
arwnd = state->localRwnd - state->queuedReceivedBytes;
sctpEV3<<simTime()<<" arwnd = "<<state->localRwnd<<" - "<<state->queuedReceivedBytes<<" = "<<arwnd<<"\n";
}
advRwnd->record(arwnd);
SCTPSackChunk* sackChunk=new SCTPSackChunk("SACK");
sackChunk->setChunkType(SACK);
sackChunk->setCumTsnAck(state->cTsnAck);
sackChunk->setA_rwnd(arwnd);
uint32 numGaps=state->numGaps;
uint32 numDups=state->dupList.size();
uint16 sackLength=SCTP_SACK_CHUNK_LENGTH + numGaps*4 + numDups*4;
uint32 mtu = getPath(remoteAddr)->pmtu;
if (sackLength > mtu-32) // FIXME
{
if (SCTP_SACK_CHUNK_LENGTH + numGaps*4 > mtu-32)
{
numDups = 0;
numGaps = (uint32)((mtu-32-SCTP_SACK_CHUNK_LENGTH)/4);
}
else
{
numDups = (uint32)((mtu-32-SCTP_SACK_CHUNK_LENGTH - numGaps*4)/4);
}
sackLength=SCTP_SACK_CHUNK_LENGTH + numGaps*4 + numDups*4;
}
sackChunk->setNumGaps(numGaps);
sackChunk->setNumDupTsns(numDups);
sackChunk->setBitLength(sackLength*8);
sctpEV3<<"Sack arwnd="<<sackChunk->getA_rwnd()<<" ctsnAck="<<state->cTsnAck<<" numGaps="<<numGaps<<" numDups="<<numDups<<"\n";
if (numGaps > 0)
{
sackChunk->setGapStartArraySize(numGaps);
sackChunk->setGapStopArraySize(numGaps);
uint32 last = state->cTsnAck;
for (key=0; key<numGaps; key++)
{
// ====== Validity check ===========================================
assert(tsnGt(state->gapStartList[key], last + 1));
assert(tsnGe(state->gapStopList[key], state->gapStartList[key]));
last = state->gapStopList[key];
sackChunk->setGapStart(key, state->gapStartList[key]);
sackChunk->setGapStop(key, state->gapStopList[key]);
}
}
if (numDups > 0)
{
sackChunk->setDupTsnsArraySize(numDups);
key=0;
for(std::list<uint32>::iterator iter=state->dupList.begin(); iter!=state->dupList.end(); iter++)
{
sackChunk->setDupTsns(key, (*iter));
key++;
if (key == numDups)
break;
}
state->dupList.clear();
}
sctpEV3<<endl;
for (uint32 i=0; i<numGaps; i++)
sctpEV3<<sackChunk->getGapStart(i)<<" - "<<sackChunk->getGapStop(i)<<"\n";
sctpEV3<<"send "<<sackChunk->getName()<<" from "<<localAddr<<" to "<<state->lastDataSourceAddress<<"\n";
return sackChunk;
}
| void SCTPAssociation::cwndUpdateAfterCwndTimeout | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by stateEntered().
{
// When the association does not transmit data on a given transport address
// within an RTO, the cwnd of the transport address SHOULD be adjusted to 2*MTU.
sctpEV3 << simTime() << ":\tCC [cwndUpdateAfterCwndTimeout]\t" << path->remoteAddress
<< "\tsst=" << path->ssthresh << " cwnd=" << path->cwnd;
path->cwnd = (int32)min(4 * path->pmtu, max(2 * path->pmtu, 4380));
sctpEV3 << "\t=>\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << endl;
recordCwndUpdate(path);
}
| void SCTPAssociation::cwndUpdateAfterRtxTimeout | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by stateEntered().
{
sctpEV3 << simTime() << ":\tCC [cwndUpdateAfterRtxTimeout]\t" << path->remoteAddress
<< "\tsst=" << path->ssthresh << " cwnd=" << path->cwnd;
path->ssthresh = (int32)max(path->cwnd / 2, 4 * path->pmtu);
path->cwnd = path->pmtu;
sctpEV3 << "\t=>\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << endl;
path->partialBytesAcked = 0;
recordCwndUpdate(path);
// Leave Fast Recovery mode
if (path->fastRecoveryActive == true) {
path->fastRecoveryActive = false;
path->fastRecoveryExitPoint = 0;
}
}
| void SCTPAssociation::cwndUpdateAfterSack | ( | ) | [protected] |
Referenced by stateEntered().
{
for (SCTPPathMap::iterator iter = sctpPathMap.begin(); iter != sctpPathMap.end(); iter++) {
SCTPPathVariables* path = iter->second;
if(path->fastRecoveryActive == false) {
// ====== Retransmission required -> reduce congestion window ======
if(path->requiresRtx) {
double decreaseFactor = 0.5;
sctpEV3 << simTime() << ":\tCC [cwndUpdateAfterSack]\t" << path->remoteAddress
<< "\tsst=" << path->ssthresh << " cwnd=" << path->cwnd;
path->ssthresh = max((int32)path->cwnd - (int32)rint(decreaseFactor * (double)path->cwnd),
4 * (int32)path->pmtu);
path->cwnd = path->ssthresh;
sctpEV3 << "\t=>\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << endl;
recordCwndUpdate(path);
path->partialBytesAcked = 0;
// ====== Fast Recovery ========================================
if(state->fastRecoverySupported) {
uint32 highestAckOnPath = state->lastTsnAck;
for(SCTPQueue::PayloadQueue::iterator pq = retransmissionQ->payloadQueue.begin();
pq != retransmissionQ->payloadQueue.end(); pq++) {
if( (chunkHasBeenAcked(pq->second) == true) &&
(tsnGt(pq->second->tsn, highestAckOnPath)) &&
(pq->second->getLastDestinationPath() == path) ) {
// T.D. 21.11.09: Only take care of TSNs on the same path!
highestAckOnPath = pq->second->tsn;
}
}
/* this can ONLY become TRUE, when Fast Recovery IS supported */
path->fastRecoveryActive = true;
path->fastRecoveryExitPoint = highestAckOnPath;
path->fastRecoveryEnteringTime = simTime();
sctpEV3 << simTime() << ":\tCC [cwndUpdateAfterSack] Entering Fast Recovery on path "
<< path->remoteAddress
<< ", exit point is " << path->fastRecoveryExitPoint << endl;
}
}
}
else {
for (SCTPPathMap::iterator iter = sctpPathMap.begin(); iter != sctpPathMap.end(); iter++) {
SCTPPathVariables* path = iter->second;
if(path->fastRecoveryActive) {
sctpEV3 << simTime() << ":\tCC [cwndUpdateAfterSack] Still in Fast Recovery on path "
<< path->remoteAddress
<< ", exit point is " << path->fastRecoveryExitPoint << endl;
}
}
}
}
}
| void SCTPAssociation::cwndUpdateBytesAcked | ( | SCTPPathVariables * | path, |
| const uint32 | ackedBytes, | ||
| const bool | ctsnaAdvanced | ||
| ) | [protected] |
Referenced by stateEntered().
{
sctpEV3 << simulation.getSimTime() << "====> cwndUpdateBytesAcked:"
<< " path=" << path->remoteAddress
<< " ackedBytes=" << ackedBytes
<< " ctsnaAdvanced=" << ((ctsnaAdvanced == true) ? "yes" : "no")
<< " cwnd=" << path->cwnd
<< " ssthresh=" << path->ssthresh
<< " ackedBytes=" << ackedBytes
<< " pathOsbBeforeUpdate=" << path->outstandingBytesBeforeUpdate
<< " pathOsb=" << path->outstandingBytes
<< endl;
if (path->fastRecoveryActive == false) {
// T.D. 21.11.09: Increasing cwnd is only allowed when not being in
// Fast Recovery mode!
// ====== Slow Start ==================================================
if (path->cwnd <= path->ssthresh) {
// ------ Clear PartialBytesAcked counter --------------------------
path->partialBytesAcked = 0;
// ------ Increase Congestion Window -------------------------------
if ((ctsnaAdvanced == true) &&
(path->outstandingBytesBeforeUpdate >= path->cwnd)) {
sctpEV3 << simTime() << ":\tCC [cwndUpdateBytesAcked-SlowStart]\t" << path->remoteAddress
<< "\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << " acked=" << ackedBytes;
path->cwnd += (int32)min(path->pmtu, ackedBytes);
sctpEV3 << "\t=>\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << endl;
recordCwndUpdate(path);
}
// ------ No need to increase Congestion Window --------------------
else {
sctpEV3 << simTime() << ":\tCC "
<< "Not increasing cwnd of path " << path->remoteAddress << " in slow start:\t"
<< "ctsnaAdvanced=" << ((ctsnaAdvanced == true) ? "yes" : "no") << "\t"
<< "cwnd=" << path->cwnd << "\t"
<< "ssthresh=" << path->ssthresh << "\t"
<< "ackedBytes=" << ackedBytes << "\t"
<< "pathOsbBeforeUpdate=" << path->outstandingBytesBeforeUpdate << "\t"
<< "pathOsb=" << path->outstandingBytes << "\t"
<< "(pathOsbBeforeUpdate >= path->cwnd)="
<< (path->outstandingBytesBeforeUpdate >= path->cwnd) << endl;
}
}
// ====== Congestion Avoidance ========================================
else
{
// ------ Increase PartialBytesAcked counter -----------------------
path->partialBytesAcked += ackedBytes;
double increaseFactor = 1.0;
// ------ Increase Congestion Window -------------------------------
if ( (path->partialBytesAcked >= path->cwnd) &&
(ctsnaAdvanced == true) &&
(path->outstandingBytesBeforeUpdate >= path->cwnd) ) {
sctpEV3 << simTime() << ":\tCC [cwndUpdateBytesAcked-CgAvoidance]\t" << path->remoteAddress
<< "\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << " acked=" << ackedBytes;
path->cwnd += (int32)rint(increaseFactor * path->pmtu);
sctpEV3 << "\t=>\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << endl;
recordCwndUpdate(path);
path->partialBytesAcked =
((path->cwnd < path->partialBytesAcked) ?
(path->partialBytesAcked - path->cwnd) : 0);
}
// ------ No need to increase Congestion Window -------------------
else {
sctpEV3 << simTime() << ":\tCC "
<< "Not increasing cwnd of path " << path->remoteAddress << " in congestion avoidance: "
<< "ctsnaAdvanced=" << ((ctsnaAdvanced == true) ? "yes" : "no") << "\t"
<< "cwnd=" << path->cwnd << "\t"
<< "ssthresh=" << path->ssthresh << "\t"
<< "ackedBytes=" << ackedBytes << "\t"
<< "pathOsbBeforeUpdate=" << path->outstandingBytesBeforeUpdate << "\t"
<< "pathOsb=" << path->outstandingBytes << "\t"
<< "(pathOsbBeforeUpdate >= path->cwnd)="
<< (path->outstandingBytesBeforeUpdate >= path->cwnd) << "\t"
<< "partialBytesAcked=" << path->partialBytesAcked << "\t"
<< "(path->partialBytesAcked >= path->cwnd)="
<< (path->partialBytesAcked >= path->cwnd) << endl;
}
}
// ====== Reset PartialBytesAcked counter if no more outstanding bytes
if(path->outstandingBytes == 0) {
path->partialBytesAcked = 0;
}
}
else {
sctpEV3 << simTime() << ":\tCC "
<< "Not increasing cwnd of path " << path->remoteAddress
<< " during Fast Recovery" << endl;
}
}
| void SCTPAssociation::cwndUpdateMaxBurst | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by stateEntered().
{
if(path->cwnd > ((path->outstandingBytes + state->maxBurst * path->pmtu))) {
sctpEV3 << simTime() << ":\tCC [cwndUpdateMaxBurst]\t" << path->remoteAddress
<< "\tsst=" << path->ssthresh << " cwnd=" << path->cwnd
<< "\tosb=" << path->outstandingBytes
<< "\tmaxBurst=" << state->maxBurst * path->pmtu << endl;
path->cwnd = path->outstandingBytes + (state->maxBurst * path->pmtu);
recordCwndUpdate(path);
sctpEV3 << "\t=>\tsst=" << path->ssthresh << " cwnd=" << path->cwnd << endl;
}
}
| void SCTPAssociation::decreaseOutstandingBytes | ( | SCTPDataVariables * | chunk | ) | [private] |
Referenced by dequeueAckedChunks(), handleChunkReportedAsAcked(), handleChunkReportedAsMissing(), moveChunkToOtherPath(), and tsnWasReneged().
{
SCTPPathVariables* lastPath = chunk->getLastDestinationPath();
assert(lastPath->outstandingBytes >= chunk->booksize);
lastPath->outstandingBytes -= chunk->booksize;
state->outstandingBytes -= chunk->booksize;
assert((int64)state->outstandingBytes >= 0);
chunk->countsAsOutstanding = false;
CounterMap::iterator iterator = qCounter.roomRetransQ.find(lastPath->remoteAddress);
iterator->second -= ADD_PADDING(chunk->booksize + SCTP_DATA_CHUNK_LENGTH);
}
| void SCTPAssociation::deleteStreams | ( | ) |
Referenced by SCTP::removeAssociation().
{
for (SCTPSendStreamMap::iterator it=sendStreams.begin(); it != sendStreams.end(); it++)
{
it->second->deleteQueue();
}
for (SCTPReceiveStreamMap::iterator it=receiveStreams.begin(); it != receiveStreams.end(); it++)
{
delete it->second;
}
}
| uint32 SCTPAssociation::dequeueAckedChunks | ( | const uint32 | tsna, |
| SCTPPathVariables * | path, | ||
| simtime_t & | rttEstimation | ||
| ) | [protected] |
Referenced by process_RCV_Message(), and processSackArrived().
{
SCTP::AssocStat* assocStat = sctpMain->getAssocStat(assocId);
uint32 newlyAckedBytes = 0;
uint64 sendBufferBeforeUpdate = state->sendBuffer;
// Set it ridiculously high
rttEstimation = MAXTIME;
// Are there chunks in the retransmission queue ? If Yes -> check for dequeue.
SCTPQueue::PayloadQueue::iterator iterator = retransmissionQ->payloadQueue.begin();
while (iterator != retransmissionQ->payloadQueue.end()) {
SCTPDataVariables* chunk = iterator->second;
if (tsnGe(tsna, chunk->tsn)) {
// Dequeue chunk, cause it has been acked
if (transmissionQ->getChunk(chunk->tsn)) {
transmissionQ->removeMsg(chunk->tsn);
chunk->enqueuedInTransmissionQ = false;
CounterMap::iterator q = qCounter.roomTransQ.find(chunk->getNextDestination());
q->second -= ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
CounterMap::iterator qb = qCounter.bookedTransQ.find(chunk->getNextDestination());
qb->second -= chunk->booksize;
}
chunk = retransmissionQ->getAndExtractChunk(chunk->tsn);
state->sendBuffer -= chunk->len/8;
SCTPPathVariables* lastPath = chunk->getLastDestinationPath();
assert(lastPath != NULL);
if (!chunkHasBeenAcked(chunk)) {
newlyAckedBytes += (chunk->booksize);
sctpEV3 << simTime() << ": CumAcked TSN " << chunk->tsn
<< " on path " << chunk->getLastDestination() << endl;
lastPath->newlyAckedBytes += (chunk->booksize);
// T.D. 05.12.09: CumAck affects lastPath -> reset its T3 timer later.
lastPath->newCumAck = true;
}
if(assocStat) {
assocStat->ackedBytes += chunk->len/8;
}
if ((chunkHasBeenAcked(chunk) == false) && (chunk->countsAsOutstanding)) {
ackChunk(chunk);
if ((chunk->numberOfTransmissions == 1) && (chunk->getLastDestinationPath() == sackPath)) {
const simtime_t timeDifference = simTime() - chunk->sendTime;
if ((timeDifference < rttEstimation) || (rttEstimation == MAXTIME)) {
rttEstimation = timeDifference;
}
}
decreaseOutstandingBytes(chunk);
}
if (chunk->userData != NULL) {
delete chunk->userData;
}
delete chunk;
}
else {
break;
}
iterator = retransmissionQ->payloadQueue.begin();
}
if(sendBufferBeforeUpdate != state->sendBuffer && state->sendBuffer < state->sendQueueLimit) {
// T.D. 06.02.2010: Just send SCTP_I_SENDQUEUE_ABATED once, after all newly acked
// chunks have been dequeued.
// I.R. only send indication if the sendBuffer size has dropped below the sendQueueLimit
assert(state->lastSendQueueAbated < simTime());
state->appSendAllowed = true;
sctpEV3 << simTime() << ":\tSCTP_I_SENDQUEUE_ABATED("
<< sendBufferBeforeUpdate - state->sendBuffer << ") to refill buffer "
<< state->sendBuffer << "/" << state->sendQueueLimit << endl;
sendIndicationToApp(SCTP_I_SENDQUEUE_ABATED, sendBufferBeforeUpdate - state->sendBuffer);
state->lastSendQueueAbated = simTime();
}
sctpEV3 << "dequeueAckedChunks(): newlyAckedBytes=" << newlyAckedBytes
<< ", rttEstimation=" << rttEstimation << endl;
return (newlyAckedBytes);
}
| SCTPDataMsg * SCTPAssociation::dequeueOutboundDataMsg | ( | const int32 | availableSpace, |
| const int32 | availableCwnd | ||
| ) | [protected] |
Referenced by sendOnPath().
{
SCTPDataMsg* datMsg=NULL;
int32 nextStream = -1;
sctpEV3<<"dequeueOutboundDataMsg: " << availableSpace <<" bytes left to be sent" << endl;
/* Only change stream if we don't have to finish a fragmented message */
if (state->lastMsgWasFragment)
nextStream = state->lastStreamScheduled;
else
nextStream = (this->*ssFunctions.ssGetNextSid)(false);
if (nextStream == -1)
return NULL;
sctpEV3<<"dequeueOutboundDataMsg: now stream "<< nextStream << endl;
for (SCTPSendStreamMap::iterator iter=sendStreams.begin(); iter!=sendStreams.end(); ++iter)
{
if ((int32)iter->first==nextStream)
{
SCTPSendStream* stream=iter->second;
cQueue* streamQ = NULL;
if (!stream->getUnorderedStreamQ()->empty())
{
streamQ = stream->getUnorderedStreamQ();
sctpEV3<<"DequeueOutboundDataMsg() found chunks in stream "<<iter->first<<" unordered queue, queue size="<<stream->getUnorderedStreamQ()->getLength()<<"\n";
}
else if (!stream->getStreamQ()->empty())
{
streamQ = stream->getStreamQ();
sctpEV3<<"DequeueOutboundDataMsg() found chunks in stream "<<iter->first<<" ordered queue, queue size="<<stream->getStreamQ()->getLength()<<"\n";
}
if (streamQ)
{
int32 b=ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(((SCTPDataMsg*)streamQ->front())->getEncapsulatedPacket())->getByteLength()+SCTP_DATA_CHUNK_LENGTH));
/* check if chunk found in queue has to be fragmented */
if (b > (int32)state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER)
{
/* START FRAGMENTATION */
SCTPDataMsg* datMsgQueued = (SCTPDataMsg*)streamQ->pop();
SCTPSimpleMessage *datMsgQueuedSimple = check_and_cast<SCTPSimpleMessage*>(datMsgQueued->getEncapsulatedPacket());
SCTPDataMsg* datMsgLastFragment = NULL;
uint32 offset = 0;
sctpEV3<<"Fragmentation: chunk " << &datMsgQueued << ", size = " << datMsgQueued->getByteLength() << endl;
while (datMsgQueued)
{
/* detemine size of fragment, either max payload or what's left */
uint32 msgbytes = state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER - SCTP_DATA_CHUNK_LENGTH;
if (msgbytes > datMsgQueuedSimple->getDataLen() - offset)
msgbytes = datMsgQueuedSimple->getDataLen() - offset;
/* new DATA msg */
SCTPDataMsg* datMsgFragment = new SCTPDataMsg();
datMsgFragment->setSid(datMsgQueued->getSid());
datMsgFragment->setPpid(datMsgQueued->getPpid());
datMsgFragment->setInitialDestination(datMsgQueued->getInitialDestination());
datMsgFragment->setEnqueuingTime(datMsgQueued->getEnqueuingTime());
datMsgFragment->setMsgNum(datMsgQueued->getMsgNum());
datMsgFragment->setOrdered(datMsgQueued->getOrdered());
datMsgFragment->setExpiryTime(datMsgQueued->getExpiryTime());
datMsgFragment->setRtx(datMsgQueued->getRtx());
datMsgFragment->setFragment(true);
datMsgFragment->setBooksize(msgbytes + state->header);
/* is this the first fragment? */
if (offset == 0)
datMsgFragment->setBBit(true);
/* new msg */
SCTPSimpleMessage *datMsgFragmentSimple = new SCTPSimpleMessage();
datMsgFragmentSimple->setName(datMsgQueuedSimple->getName());
datMsgFragmentSimple->setCreationTime(datMsgQueuedSimple->getCreationTime());
datMsgFragmentSimple->setDataArraySize(msgbytes);
datMsgFragmentSimple->setDataLen(msgbytes);
datMsgFragmentSimple->setByteLength(msgbytes);
/* copy data */
for (uint32 i = offset; i < offset + msgbytes; i++)
datMsgFragmentSimple->setData(i - offset, datMsgQueuedSimple->getData(i));
offset += msgbytes;
datMsgFragment->encapsulate(datMsgFragmentSimple);
/* insert fragment into queue */
if (!streamQ->empty())
{
if (!datMsgLastFragment)
{
/* insert first fragment at the begining of the queue*/
streamQ->insertBefore((SCTPDataMsg*)streamQ->front(), datMsgFragment);
}
else
{
/* insert fragment after last inserted */
streamQ->insertAfter(datMsgLastFragment, datMsgFragment);
}
}
else
streamQ->insert(datMsgFragment);
state->queuedMessages++;
qCounter.roomSumSendStreams += ADD_PADDING(datMsgFragment->getByteLength() + SCTP_DATA_CHUNK_LENGTH);
qCounter.bookedSumSendStreams += datMsgFragment->getBooksize();
sctpEV3<<"Fragmentation: fragment " << &datMsgFragment << " created, length = " << datMsgFragmentSimple->getByteLength() << ", queue size = " << streamQ->getLength() << endl;
datMsgLastFragment = datMsgFragment;
/* all fragments done? */
if (datMsgQueuedSimple->getDataLen() == offset)
{
datMsgFragment->setEBit(true);
/* remove original element */
sctpEV3<<"Fragmentation: delete " << &datMsgQueued << endl;
//streamQ->pop();
qCounter.roomSumSendStreams -= ADD_PADDING(datMsgQueued->getByteLength() + SCTP_DATA_CHUNK_LENGTH);
qCounter.bookedSumSendStreams -= datMsgQueued->getBooksize();
delete datMsgQueued;
datMsgQueued = NULL;
state->queuedMessages--;
}
}
/* the next chunk returned will always be a fragment */
state->lastMsgWasFragment = true;
b=ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(((SCTPDataMsg*)streamQ->front())->getEncapsulatedPacket())->getBitLength()/8+SCTP_DATA_CHUNK_LENGTH));
/* FRAGMENTATION DONE */
}
if ((b <= availableSpace) &&
( (int32)((SCTPDataMsg*)streamQ->front())->getBooksize() <= availableCwnd)) {
datMsg = (SCTPDataMsg*)streamQ->pop();
/*if (!state->appSendAllowed && streamQ->getLength()<=state->sendQueueLimit)
{
state->appSendAllowed = true;
sendIndicationToApp(SCTP_I_SENDQUEUE_ABATED);
}*/
sendQueue->record(streamQ->getLength());
if (!datMsg->getFragment())
{
datMsg->setBBit(true);
datMsg->setEBit(true);
state->lastMsgWasFragment = false;
}
else
{
if (datMsg->getEBit())
state->lastMsgWasFragment = false;
else
state->lastMsgWasFragment = true;
}
sctpEV3<<"DequeueOutboundDataMsg() found chunk ("<<&datMsg<<") in the stream queue "<<&iter->first<<"("<<streamQ<<") queue size="<<streamQ->getLength()<<"\n";
}
}
break;
}
}
if (datMsg != NULL)
{
qCounter.roomSumSendStreams -= ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(datMsg->getEncapsulatedPacket())->getBitLength()/8+SCTP_DATA_CHUNK_LENGTH));
qCounter.bookedSumSendStreams -= datMsg->getBooksize();
}
return (datMsg);
}
| void SCTPAssociation::disposeOf | ( | SCTPMessage * | sctpmsg | ) | [protected] |
Referenced by process_RCV_Message().
{
SCTPChunk* chunk;
uint32 numberOfChunks = sctpmsg->getChunksArraySize();
if (numberOfChunks>0)
for (uint32 i=0; i<numberOfChunks; i++)
{
chunk = (SCTPChunk*)(sctpmsg->removeChunk());
if (chunk->getChunkType()==DATA)
delete (SCTPSimpleMessage*)chunk->decapsulate();
delete chunk;
}
delete sctpmsg;
}
| const char * SCTPAssociation::eventName | ( | const int32 | event | ) | [static, protected] |
Utility: returns name of SCTP_E_xxx constants
Referenced by performStateTransition(), and processAppCommand().
{
#define CASE(x) case x: s=#x+7; break
const char* s = "unknown";
switch (event) {
CASE(SCTP_E_OPEN_PASSIVE);
CASE(SCTP_E_ASSOCIATE);
CASE(SCTP_E_SHUTDOWN);
CASE(SCTP_E_CLOSE);
CASE(SCTP_E_ABORT);
CASE(SCTP_E_SEND);
CASE(SCTP_E_RCV_INIT);
CASE(SCTP_E_RCV_ABORT);
CASE(SCTP_E_RCV_VALID_COOKIE_ECHO);
CASE(SCTP_E_RCV_INIT_ACK);
CASE(SCTP_E_RCV_COOKIE_ACK);
CASE(SCTP_E_RCV_SHUTDOWN);
CASE(SCTP_E_RCV_SHUTDOWN_ACK);
CASE(SCTP_E_RCV_SHUTDOWN_COMPLETE);
CASE(SCTP_E_TIMEOUT_INIT_TIMER);
CASE(SCTP_E_TIMEOUT_SHUTDOWN_TIMER);
CASE(SCTP_E_TIMEOUT_RTX_TIMER);
CASE(SCTP_E_TIMEOUT_HEARTBEAT_TIMER);
CASE(SCTP_E_RECEIVE);
CASE(SCTP_E_DUP_RECEIVED);
CASE(SCTP_E_PRIMARY);
CASE(SCTP_E_QUEUE_MSGS_LIMIT);
CASE(SCTP_E_QUEUE_BYTES_LIMIT);
CASE(SCTP_E_NO_MORE_OUTSTANDING);
CASE(SCTP_E_IGNORE);
CASE(SCTP_E_DELIVERED);
CASE(SCTP_E_SEND_SHUTDOWN_ACK);
CASE(SCTP_E_STOP_SENDING);
}
return s;
#undef CASE
}
| cFSM* SCTPAssociation::getFsm | ( | ) | const [inline] |
{ return fsm; };
| int32 SCTPAssociation::getFsmState | ( | ) | const [inline] |
{ return fsm->getState(); };
| cMessage* SCTPAssociation::getInitTimer | ( | ) | const [inline] |
{ return T1_InitTimer; };
| const IPvXAddress& SCTPAssociation::getNextAddress | ( | const SCTPPathVariables * | oldPath | ) | const [inline, protected] |
{
const SCTPPathVariables* nextPath = getNextPath(oldPath);
if(nextPath != NULL) {
return(nextPath->remoteAddress);
}
return(SCTPDataVariables::zeroAddress);
}
| SCTPPathVariables * SCTPAssociation::getNextDestination | ( | SCTPDataVariables * | chunk | ) | const [protected] |
Referenced by handleChunkReportedAsMissing(), and process_TIMEOUT_RTX().
{
SCTPPathVariables* next;
SCTPPathVariables* last;
sctpEV3 << "Running getNextDestination()" << endl;
if (chunk->numberOfTransmissions == 0) {
if (chunk->getInitialDestinationPath() == NULL) {
next = state->getPrimaryPath();
}
else {
next = chunk->getInitialDestinationPath();
}
}
else {
if (chunk->hasBeenFastRetransmitted) {
sctpEV3 << "Chunk is scheduled for FastRetransmission. Next destination = "
<< chunk->getLastDestination() << endl;
return(chunk->getLastDestinationPath());
}
// If this is a retransmission, we should choose another, active path.
last = chunk->getLastDestinationPath();
next = getNextPath(last);
if( (next == NULL) || (next->confirmed == false) ) {
next = last;
}
}
sctpEV3 << "getNextDestination(): chunk was last sent to " << last->remoteAddress
<< ", will next be sent to path " << next->remoteAddress << endl;
return (next);
}
| SCTPPathVariables * SCTPAssociation::getNextPath | ( | const SCTPPathVariables * | oldPath | ) | const [protected] |
Referenced by getNextDestination(), process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_RTX(), and updateCounters().
{
int32 hit = 0;
if (sctpPathMap.size() > 1) {
for (SCTPPathMap::const_iterator iterator = sctpPathMap.begin();
iterator != sctpPathMap.end(); iterator++) {
if (iterator->second == oldPath) {
if (++hit == 1) {
continue;
}
else {
break;
}
}
if (iterator->second->activePath) {
return iterator->second;
}
}
}
return(NULL);
}
| SCTPDataVariables * SCTPAssociation::getOutboundDataChunk | ( | const SCTPPathVariables * | path, |
| const int32 | availableSpace, | ||
| const int32 | availableCwnd | ||
| ) | [protected] |
Referenced by sendOnPath().
{
/* are there chunks in the transmission queue ? If Yes -> dequeue and return it */
sctpEV3 << "getOutboundDataChunk(" << path->remoteAddress << "):"
<< " availableSpace=" << availableSpace
<< " availableCwnd=" << availableCwnd
<< endl;
if (!transmissionQ->payloadQueue.empty()) {
for(SCTPQueue::PayloadQueue::iterator it = transmissionQ->payloadQueue.begin();
it != transmissionQ->payloadQueue.end(); it++) {
SCTPDataVariables* chunk = it->second;
if( (chunkHasBeenAcked(chunk) == false) &&
(chunk->getNextDestinationPath() == path) ) {
const int32 len = ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
sctpEV3 << "getOutboundDataChunk() found chunk " << chunk->tsn
<<" in the transmission queue, length=" << len << endl;
if ((len <= availableSpace) &&
((int32)chunk->booksize <= availableCwnd)) {
// T.D. 05.01.2010: The bookkeeping counters may only be decreased when
// this chunk is actually dequeued. Therefore, the check
// for "chunkHasBeenAcked==false" has been moved into the
// "if" statement above!
transmissionQ->payloadQueue.erase(it);
chunk->enqueuedInTransmissionQ = false;
CounterMap::iterator i = qCounter.roomTransQ.find(path->remoteAddress);
i->second -= ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
CounterMap::iterator ib = qCounter.bookedTransQ.find(path->remoteAddress);
ib->second -= chunk->booksize;
return chunk;
}
}
}
}
return NULL;
}
| int32 SCTPAssociation::getOutstandingBytes | ( | ) | const [protected] |
Referenced by process_CLOSE(), process_RCV_Message(), processSackArrived(), sendShutdownAck(), and stateEntered().
{
int32 osb = 0;
for (SCTPPathMap::const_iterator pm = sctpPathMap.begin(); pm != sctpPathMap.end(); pm++) {
osb += pm->second->outstandingBytes;
}
return osb;
}
| SCTPPathVariables* SCTPAssociation::getPath | ( | const IPvXAddress & | pathId | ) | const [inline] |
Referenced by createSack(), process_PRIMARY(), process_RCV_Message(), process_SEND(), process_STATUS(), processDataArrived(), processHeartbeatAckArrived(), processInitAckArrived(), processInitArrived(), processSackArrived(), processTimer(), recordInPathVectors(), sendInit(), sendInitAck(), and sendShutdownAck().
{
SCTPPathMap::const_iterator iterator = sctpPathMap.find(pathId);
if (iterator !=sctpPathMap.end()) {
return iterator->second;
}
return NULL;
}
| SCTPQueue* SCTPAssociation::getRetransmissionQueue | ( | ) | const [inline] |
Referenced by SCTP::removeAssociation().
{ return retransmissionQ; };
| cMessage* SCTPAssociation::getSackTimer | ( | ) | const [inline] |
{ return SackTimer; };
| SCTPAlgorithm* SCTPAssociation::getSctpAlgorithm | ( | ) | const [inline] |
{ return sctpAlgorithm; };
| SCTP* SCTPAssociation::getSctpMain | ( | ) | const [inline] |
Referenced by SCTPPathVariables::SCTPPathVariables().
{ return sctpMain; };
| cMessage* SCTPAssociation::getShutdownTimer | ( | ) | const [inline] |
{ return T2_ShutdownTimer; };
| SCTPStateVariables* SCTPAssociation::getState | ( | ) | const [inline] |
{ return state; };
| SCTPQueue* SCTPAssociation::getTransmissionQueue | ( | ) | const [inline] |
Referenced by SCTP::removeAssociation(), and SCTPAlgorithm::setAssociation().
{ return transmissionQ; };
| void SCTPAssociation::handleChunkReportedAsAcked | ( | uint32 & | highestNewAck, |
| simtime_t & | rttEstimation, | ||
| SCTPDataVariables * | myChunk, | ||
| SCTPPathVariables * | sackPath | ||
| ) | [private] |
Referenced by processSackArrived().
{
SCTPPathVariables* myChunkLastPath = myChunk->getLastDestinationPath();
if ( (myChunk->numberOfTransmissions == 1) &&
(myChunk->hasBeenReneged == false) ) {
if (myChunkLastPath == sackPath) {
const simtime_t timeDifference = simTime() - myChunk->sendTime;
if((timeDifference < rttEstimation) || (rttEstimation == -1.0)) {
rttEstimation = timeDifference;
}
sctpEV3 << simTime() << " processSackArrived: computed rtt time diff == "
<< timeDifference << " for TSN "<< myChunk->tsn << endl;
}
}
if ( (myChunk->hasBeenAbandoned == false) &&
(myChunk->hasBeenReneged == false) ) {
myChunkLastPath->newlyAckedBytes += (myChunk->booksize);
sctpEV3 << simTime() << ": GapAcked TSN " << myChunk->tsn
<< " on path " << myChunkLastPath->remoteAddress << endl;
if (myChunk->tsn > highestNewAck) {
highestNewAck = myChunk->tsn;
}
ackChunk(myChunk);
if (myChunk->countsAsOutstanding) {
decreaseOutstandingBytes(myChunk);
}
if (transmissionQ->getChunk(myChunk->tsn)) { // I.R. 02.01.07
sctpEV3 << "Found TSN " << myChunk->tsn << " in transmissionQ -> remote it" << endl;
transmissionQ->removeMsg(myChunk->tsn);
myChunk->enqueuedInTransmissionQ = false;
CounterMap::iterator q = qCounter.roomTransQ.find(myChunk->getNextDestination());
q->second -= ADD_PADDING(myChunk->len/8+SCTP_DATA_CHUNK_LENGTH);
CounterMap::iterator qb = qCounter.bookedTransQ.find(myChunk->getNextDestination());
qb->second -= myChunk->booksize;
}
myChunk->gapReports = 0;
}
}
| void SCTPAssociation::handleChunkReportedAsMissing | ( | const SCTPSackChunk * | sackChunk, |
| const uint32 | highestNewAck, | ||
| SCTPDataVariables * | myChunk, | ||
| const SCTPPathVariables * | sackPath | ||
| ) | [private] |
Referenced by processSackArrived().
{
SCTPPathVariables* myChunkLastPath = myChunk->getLastDestinationPath();
sctpEV3 << "TSN " << myChunk->tsn << " is missing in gap reports (last "
<< myChunkLastPath->remoteAddress << ") ";
if (!chunkHasBeenAcked(myChunk)) {
sctpEV3 << "has not been acked, highestNewAck=" << highestNewAck
<< " countsAsOutstanding=" << myChunk->countsAsOutstanding << endl;
const uint32 chunkReportedAsMissing = (highestNewAck > myChunk->tsn) ? 1 : 0;
if (chunkReportedAsMissing > 0) {
// T.D. 15.04.09: Increase gapReports by chunkReportedAsMissing.
// Fixed bug here: gapReports += chunkReportedAsMissing instead of gapReports = chunkReportedAsMissing.
/*
printf("GapReports for TSN %u [ret=%d,fast=%s] at t=%s: %d --> %d by %d\n",
myChunk->tsn,
myChunk->numberOfRetransmissions,
(myChunk->hasBeenFastRetransmitted == true) ? "YES" : "no",
simTime().str().c_str(),
myChunk->gapReports,
myChunk->gapReports + chunkReportedAsMissing,
chunkReportedAsMissing);
*/
myChunk->gapReports += chunkReportedAsMissing;
myChunkLastPath->gapNAcksInLastSACK++;
if (myChunk->gapReports >= state->numGapReports) {
bool fastRtx = false;
fastRtx = ((myChunk->hasBeenFastRetransmitted == false) &&
(myChunk->numberOfRetransmissions == 0));
if (fastRtx) {
// ====== Add chunk to transmission queue ========
SCTPQueue::PayloadQueue::iterator it = transmissionQ->payloadQueue.find(myChunk->tsn);
if (transmissionQ->getChunk(myChunk->tsn) == NULL) {
SCTP::AssocStat* assocStat = sctpMain->getAssocStat(assocId);
if(assocStat) {
assocStat->numFastRtx++;
}
myChunk->hasBeenFastRetransmitted = true;
sctpEV3 << simTime() << ": Fast RTX for TSN "
<< myChunk->tsn << " on path " << myChunk->getLastDestination() << endl;
myChunkLastPath->numberOfFastRetransmissions++;
myChunk->setNextDestination(getNextDestination(myChunk));
SCTPPathVariables* myChunkNextPath = myChunk->getNextDestinationPath();
assert(myChunkNextPath != NULL);
if (myChunk->countsAsOutstanding) {
decreaseOutstandingBytes(myChunk);
}
if (!transmissionQ->checkAndInsertChunk(myChunk->tsn, myChunk)) {
sctpEV3 << "Fast RTX: cannot add message/chunk (TSN="
<< myChunk->tsn << ") to the transmissionQ" << endl;
}
else {
myChunk->enqueuedInTransmissionQ = true;
CounterMap::iterator q = qCounter.roomTransQ.find(myChunk->getNextDestination());
q->second += ADD_PADDING(myChunk->len/8+SCTP_DATA_CHUNK_LENGTH);
CounterMap::iterator qb = qCounter.bookedTransQ.find(myChunk->getNextDestination());
qb->second += myChunk->booksize;
}
myChunkNextPath->requiresRtx = true;
if(myChunkNextPath->findLowestTSN == true) {
// TD 08.12.09: fixed detection of lowest TSN retransmitted
myChunkNextPath->lowestTSNRetransmitted = true;
}
}
}
}
}
else {
myChunk->hasBeenFastRetransmitted = false;
sctpEV3 << "TSN " << myChunk->tsn << " countsAsOutstanding="
<< myChunk->countsAsOutstanding << endl;
if (highestNewAck > myChunk->tsn) {
myChunk->gapReports++;
}
myChunkLastPath->gapAcksInLastSACK++;
}
myChunkLastPath->findLowestTSN = false;
}
else {
// Reneging, type 1:
// A chunk in the gap blocks has been un-acked => reneg it.
tsnWasReneged(myChunk,
1);
}
}
| void SCTPAssociation::increaseOutstandingBytes | ( | SCTPDataVariables * | chunk, |
| SCTPPathVariables * | path | ||
| ) | [private] |
Referenced by sendOnPath().
{
path->outstandingBytes += chunk->booksize;
state->outstandingBytes += chunk->booksize;
CounterMap::iterator iterator = qCounter.roomRetransQ.find(path->remoteAddress);
iterator->second += ADD_PADDING(chunk->booksize + SCTP_DATA_CHUNK_LENGTH);
}
| const char * SCTPAssociation::indicationName | ( | const int32 | code | ) | [static] |
Utility: returns name of SCTP_I_xxx constants
Referenced by sendEstabIndicationToApp(), and sendIndicationToApp().
{
#define CASE(x) case x: s=#x+7; break
const char* s = "unknown";
switch (code) {
CASE(SCTP_I_DATA);
CASE(SCTP_I_DATA_NOTIFICATION);
CASE(SCTP_I_ESTABLISHED);
CASE(SCTP_I_PEER_CLOSED);
CASE(SCTP_I_CLOSED);
CASE(SCTP_I_CONNECTION_REFUSED);
CASE(SCTP_I_CONNECTION_RESET);
CASE(SCTP_I_TIMED_OUT);
CASE(SCTP_I_STATUS);
CASE(SCTP_I_ABORT);
CASE(SCTP_I_SHUTDOWN_RECEIVED);
CASE(SCTP_I_SEND_MSG);
CASE(SCTP_I_SENDQUEUE_FULL);
CASE(SCTP_I_SENDQUEUE_ABATED);
}
return s;
#undef CASE
}
| void SCTPAssociation::initAssociation | ( | SCTPOpenCommand * | openCmd | ) | [protected] |
Utility: creates send/receive queues and sctpAlgorithm
Referenced by process_ASSOCIATE(), and process_OPEN_PASSIVE().
{
sctpEV3<<"SCTPAssociationUtil:initAssociation\n";
// create send/receive queues
const char *queueClass = openCmd->getQueueClass();
transmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
retransmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
outboundStreams = openCmd->getOutboundStreams();
// create algorithm
const char *sctpAlgorithmClass = openCmd->getSctpAlgorithmClass();
if (!sctpAlgorithmClass || !sctpAlgorithmClass[0])
sctpAlgorithmClass = sctpMain->par("sctpAlgorithmClass");
sctpAlgorithm = check_and_cast<SCTPAlgorithm *>(createOne(sctpAlgorithmClass));
sctpAlgorithm->setAssociation(this);
sctpAlgorithm->initialize();
// create state block
state = sctpAlgorithm->createStateVariables();
}
| void SCTPAssociation::initCCParameters | ( | SCTPPathVariables * | path | ) | [protected] |
SCTPCCFunctions
Referenced by pmStartPathManagement(), and stateEntered().
| void SCTPAssociation::initStreams | ( | uint32 | inStreams, |
| uint32 | outStreams | ||
| ) | [protected] |
Referenced by SCTPAssociation().
{
uint32 i;
sctpEV3<<"initStreams instreams="<<inStreams<<" outstream="<<outStreams<<"\n";
if (receiveStreams.size()==0 && sendStreams.size()==0)
{
for (i=0; i<inStreams; i++)
{
SCTPReceiveStream* rcvStream = new SCTPReceiveStream();
this->receiveStreams[i]=rcvStream;
rcvStream->setStreamId(i);
this->state->numMsgsReq[i]=0;
}
for (i=0; i<outStreams; i++)
{
SCTPSendStream* sendStream = new SCTPSendStream(i);
this->sendStreams[i]=sendStream;
sendStream->setStreamId(i);
}
}
}
| void SCTPAssociation::loadPacket | ( | SCTPPathVariables * | pathVar, |
| SCTPMessage ** | sctpMsg, | ||
| uint16 * | chunksAdded, | ||
| uint16 * | dataChunksAdded, | ||
| uint32 * | packetBytes, | ||
| bool * | authAdded | ||
| ) | [private] |
Referenced by sendOnPath().
{
*sctpMsg = state->sctpMsg;
*chunksAdded = state->chunksAdded;
*dataChunksAdded = state->dataChunksAdded;
*packetBytes = state->packetBytes;
sctpEV3 << "loadPacket: path=" << pathVar->remoteAddress << " osb=" << pathVar->outstandingBytes << " -> " << pathVar->outstandingBytes + state->packetBytes << endl;
pathVar->outstandingBytes += state->packetBytes;
qCounter.bookedSumSendStreams -= state->packetBytes;
for (uint16 i = 0; i < (*sctpMsg)->getChunksArraySize(); i++)
retransmissionQ->payloadQueue.find(((SCTPDataChunk*)(*sctpMsg)->getChunks(i))->getTsn())->second->countsAsOutstanding = true;
}
| SCTPDataVariables * SCTPAssociation::makeDataVarFromDataMsg | ( | SCTPDataMsg * | datMsg, |
| SCTPPathVariables * | path | ||
| ) | [private] |
Referenced by sendOnPath().
{
SCTPDataVariables* datVar = new SCTPDataVariables();
datMsg->setInitialDestination(path->remoteAddress);
datVar->setInitialDestination(path);
datVar->bbit = datMsg->getBBit();
datVar->ebit = datMsg->getEBit();
datVar->enqueuingTime = datMsg->getEnqueuingTime();
datVar->expiryTime = datMsg->getExpiryTime();
datVar->ppid = datMsg->getPpid();
datVar->len = datMsg->getBitLength();
datVar->sid = datMsg->getSid();
datVar->allowedNoRetransmissions = datMsg->getRtx();
datVar->booksize = datMsg->getBooksize();
// ------ Stream handling ---------------------------------------
SCTPSendStreamMap::iterator iterator = sendStreams.find(datMsg->getSid());
SCTPSendStream* stream = iterator->second;
uint32 nextSSN = stream->getNextStreamSeqNum();
datVar->userData = datMsg->decapsulate();
if (datMsg->getOrdered()) {
// ------ Ordered mode: assign SSN ---------
if (datMsg->getEBit())
{
datVar->ssn = nextSSN++;
}
else
{
datVar->ssn = nextSSN;
}
datVar->ordered = true;
if (nextSSN > 65535) {
stream->setNextStreamSeqNum(0);
}
else {
stream->setNextStreamSeqNum(nextSSN);
}
}
else {
// ------ Ordered mode: no SSN needed ------
datVar->ssn = 0;
datVar->ordered = false;
}
return(datVar);
}
| bool SCTPAssociation::makeRoomForTsn | ( | const uint32 | tsn, |
| const uint32 | length, | ||
| const bool | uBit | ||
| ) | [protected] |
Referenced by processDataArrived().
{
SCTPQueue* stream, dStream;
uint32 sum = 0;
uint32 comp = 0;
bool delQ = false;
uint32 high = state->highestTsnStored;
sctpEV3 << "makeRoomForTsn: tsn=" << tsn
<< ", length=" << length << " high=" << high << endl;
while ((sum < length) && (state->highestTsnReceived>state->lastTsnAck)) {
comp = sum;
for (SCTPReceiveStreamMap::iterator iter = receiveStreams.begin();
iter!=receiveStreams.end(); iter++) {
if (tsn > high) {
return false;
}
if (uBit) {
stream = iter->second->getUnorderedQ();
}
else {
stream = iter->second->getOrderedQ();
}
SCTPDataVariables* chunk = stream->getChunk(high);
if (chunk == NULL) { //12.06.08
sctpEV3 << high << " not found in orderedQ. Try deliveryQ" << endl;
stream = iter->second->getDeliveryQ();
chunk = stream->getChunk(high);
delQ = true;
}
if (chunk != NULL) {
sum+=chunk->len;
if (stream->deleteMsg(high)) {
sctpEV3 << high << " found and deleted" << endl;
state->queuedReceivedBytes-=chunk->len/8; //12.06.08
if (ssnGt(iter->second->getExpectedStreamSeqNum(),chunk->ssn)) {
iter->second->setExpectedStreamSeqNum(chunk->ssn);
}
}
qCounter.roomSumRcvStreams -= ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
if (high == state->highestTsnReceived) {
state->highestTsnReceived--;
}
removeFromGapList(high);
if (tsn > state->highestTsnReceived) {
state->highestTsnReceived = tsn;
}
high--;
break;
}
else {
sctpEV3 << "TSN " << high << " not found in stream "
<< iter->second->getStreamId() << endl;
}
}
if (comp == sum) {
sctpEV3 << high << " not found in any stream" << endl;
high--;
}
state->highestTsnStored = high;
if (tsn > state->highestTsnReceived) {
return false;
}
}
return true;
}
| SCTPDataVariables * SCTPAssociation::makeVarFromMsg | ( | SCTPDataChunk * | datachunk | ) | [protected] |
Referenced by processDataArrived().
{
SCTPDataVariables* chunk = new SCTPDataVariables();
chunk->bbit = dataChunk->getBBit();
chunk->ebit = dataChunk->getEBit();
chunk->sid = dataChunk->getSid();
chunk->ssn = dataChunk->getSsn();
chunk->ppid = dataChunk->getPpid();
chunk->tsn = dataChunk->getTsn();
if (!dataChunk->getUBit()) {
chunk->ordered = true;
}
else {
chunk->ordered = false;
}
SCTPSimpleMessage* smsg=check_and_cast<SCTPSimpleMessage*>(dataChunk->decapsulate());
chunk->userData = smsg;
chunk->len = smsg->getDataLen()*8;
sctpEV3 << "makeVarFromMsg: queuedBytes has been increased to "
<< state->queuedReceivedBytes << endl;
return chunk;
}
| void SCTPAssociation::moveChunkToOtherPath | ( | SCTPDataVariables * | chunk, |
| SCTPPathVariables * | newPath | ||
| ) | [private] |
Referenced by process_TIMEOUT_RTX().
{
// ====== Prepare next destination =======================================
SCTPPathVariables* lastPath = chunk->getLastDestinationPath();
chunk->hasBeenFastRetransmitted = false;
chunk->gapReports = 0;
chunk->setNextDestination(newPath);
sctpEV3 << simTime() << ": Timer-Based RTX for TSN " << chunk->tsn
<< ": lastDestination=" << chunk->getLastDestination()
<< " nextDestination=" << chunk->getNextDestination() << endl;
// ======= Remove chunk's booksize from outstanding bytes ================
// T.D. 12.02.2010: This case may happen when using sender queue control!
if(chunk->countsAsOutstanding) {
assert(lastPath->outstandingBytes >= chunk->booksize);
lastPath->outstandingBytes -= chunk->booksize;
assert((int32)lastPath->outstandingBytes >= 0);
state->outstandingBytes -= chunk->booksize;
assert((int64)state->outstandingBytes >= 0);
chunk->countsAsOutstanding = false;
// T.D. 12.02.2010: No Timer-Based RTX is necessary any more when there
// are no outstanding bytes!
if(lastPath->outstandingBytes == 0) {
stopTimer(lastPath->T3_RtxTimer);
}
}
// ====== Perform bookkeeping ============================================
// Check, if chunk_ptr->tsn is already in transmission queue.
// This can happen in case multiple timeouts occur in succession.
if (!transmissionQ->checkAndInsertChunk(chunk->tsn, chunk)) {
sctpEV3 << "TSN " << chunk->tsn << " already in transmissionQ" << endl;
return;
}
else {
chunk->enqueuedInTransmissionQ = true;
sctpEV3 << "Inserting TSN " << chunk->tsn << " into transmissionQ" << endl;
CounterMap::iterator q = qCounter.roomTransQ.find(chunk->getNextDestination());
q->second += ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
CounterMap::iterator qb = qCounter.bookedTransQ.find(chunk->getNextDestination());
qb->second += chunk->booksize;
if (chunk->countsAsOutstanding) {
decreaseOutstandingBytes(chunk);
}
state->peerRwnd += (chunk->booksize);
}
if (state->peerRwnd > state->initialPeerRwnd) {
state->peerRwnd = state->initialPeerRwnd;
}
sctpEV3 << "T3 Timeout: Chunk (TSN=" << chunk->tsn
<< ") has been requeued in transmissionQ, rwnd was set to "
<< state->peerRwnd << endl;
}
| bool SCTPAssociation::nextChunkFitsIntoPacket | ( | int32 | bytes | ) | [protected] |
Referenced by sendOnPath().
{
int32 nextStream = -1;
SCTPSendStream* stream;
/* Only change stream if we don't have to finish a fragmented message */
if (state->lastMsgWasFragment)
nextStream = state->lastStreamScheduled;
else
nextStream = (this->*ssFunctions.ssGetNextSid)(true);
if (nextStream == -1)
return false;
stream = sendStreams.find(nextStream)->second;
if (stream)
{
cQueue* streamQ = NULL;
if (!stream->getUnorderedStreamQ()->empty())
streamQ = stream->getUnorderedStreamQ();
else if (!stream->getStreamQ()->empty())
streamQ = stream->getStreamQ();
if (streamQ)
{
int32 b=ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(((SCTPDataMsg*)streamQ->front())->getEncapsulatedPacket())->getByteLength()+SCTP_DATA_CHUNK_LENGTH));
/* Check if next message would be fragmented */
if (b > (int32) state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER)
{
/* Test if fragment fits */
if (bytes >= (int32) state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER - SCTP_DATA_CHUNK_LENGTH)
return true;
else
return false;
}
/* Message doesn't need to be fragmented, just try if it fits */
if (b <= bytes)
return true;
else
return false;
}
}
return false;
}
| int32 SCTPAssociation::numUsableStreams | ( | void | ) | [protected] |
Referenced by SCTPAssociation().
{
int32 count=0;
for (SCTPSendStreamMap::iterator iter=sendStreams.begin(); iter!=sendStreams.end(); iter++)
if (iter->second->getStreamQ()->length()>0 || iter->second->getUnorderedStreamQ()->length()>0)
{
count++;
}
return count;
}
| void SCTPAssociation::pathStatusIndication | ( | const SCTPPathVariables * | path, |
| const bool | status | ||
| ) | [protected] |
Referenced by pmClearPathCounter(), process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_RTX(), and updateCounters().
{
cPacket* msg = new cPacket("StatusInfo");
msg->setKind(SCTP_I_STATUS);
SCTPStatusInfo* cmd = new SCTPStatusInfo();
cmd->setPathId(path->remoteAddress);
cmd->setAssocId(assocId);
cmd->setActive(status);
msg->setControlInfo(cmd);
if (!status) {
SCTP::AssocStatMap::iterator iter=sctpMain->assocStatMap.find(assocId);
iter->second.numPathFailures++;
}
sendToApp(msg);
}
| SCTPDataVariables * SCTPAssociation::peekAbandonedChunk | ( | const SCTPPathVariables * | path | ) | [protected] |
{
// Are there chunks in the retransmission queue? If Yes -> dequeue and return it.
if (!retransmissionQ->payloadQueue.empty())
{
for(SCTPQueue::PayloadQueue::iterator it = retransmissionQ->payloadQueue.begin();
it != retransmissionQ->payloadQueue.end(); it++) {
SCTPDataVariables* chunk = it->second;
sctpEV3<<"peek Chunk "<<chunk->tsn<<"\n";
if (chunk->getLastDestinationPath() == path && chunk->hasBeenAbandoned) {
sctpEV3<<"peekAbandonedChunk() found chunk in the retransmission queue\n";
return chunk;
}
}
}
return NULL;
}
| SCTPDataMsg * SCTPAssociation::peekOutboundDataMsg | ( | ) | [protected] |
Referenced by calculateBytesToSendOnPath().
{
SCTPDataMsg* datMsg=NULL;
int32 nextStream = -1;
nextStream = (this->*ssFunctions.ssGetNextSid)(true);
if (nextStream == -1)
{
sctpEV3<<"peekOutboundDataMsg(): no valid stream found -> returning NULL !\n";
return NULL;
}
for (SCTPSendStreamMap::iterator iter=sendStreams.begin(); iter!=sendStreams.end(); ++iter)
{
if ((int32)iter->first==nextStream)
{
SCTPSendStream* stream=iter->second;
if (!stream->getUnorderedStreamQ()->empty())
{
return (datMsg);
}
if (!stream->getStreamQ()->empty())
{
return (datMsg);
}
}
}
return NULL;
}
| bool SCTPAssociation::performStateTransition | ( | const SCTPEventCode & | event | ) | [protected] |
Implemements the pure SCTP state machine
Referenced by process_RCV_Message(), processAppCommand(), processCookieAckArrived(), processCookieEchoArrived(), processInitAckArrived(), processInitArrived(), processTimer(), pushUlp(), sendShutdown(), and sendShutdownAck().
{
sctpEV3<<"performStateTransition\n";
if (event==SCTP_E_IGNORE) // e.g. discarded segment
{
ev << "Staying in state: " << stateName(fsm->getState()) << " (no FSM event)\n";
return true;
}
// state machine
int32 oldState = fsm->getState();
switch (fsm->getState())
{
case SCTP_S_CLOSED:
switch (event)
{
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_OPEN_PASSIVE: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_ASSOCIATE: FSM_Goto((*fsm), SCTP_S_COOKIE_WAIT); break;
case SCTP_E_RCV_INIT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_VALID_COOKIE_ECHO: FSM_Goto((*fsm), SCTP_S_ESTABLISHED); break;
case SCTP_E_CLOSE: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
default:;
}
break;
case SCTP_S_COOKIE_WAIT:
switch (event)
{
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_INIT_ACK: FSM_Goto((*fsm), SCTP_S_COOKIE_ECHOED); break;
case SCTP_E_RCV_VALID_COOKIE_ECHO: FSM_Goto((*fsm), SCTP_S_ESTABLISHED); break;
default:;
}
break;
case SCTP_S_COOKIE_ECHOED:
switch (event)
{
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_COOKIE_ACK:FSM_Goto((*fsm), SCTP_S_ESTABLISHED); break;
default:;
}
break;
case SCTP_S_ESTABLISHED:
switch (event)
{
case SCTP_E_SEND: FSM_Goto((*fsm), SCTP_S_ESTABLISHED); break;
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_SHUTDOWN: FSM_Goto((*fsm), SCTP_S_SHUTDOWN_PENDING); break;
case SCTP_E_STOP_SENDING: FSM_Goto((*fsm), SCTP_S_SHUTDOWN_PENDING); state->stopSending = true; state->lastTSN = state->nextTSN-1; break; //I.R.
case SCTP_E_RCV_SHUTDOWN: FSM_Goto((*fsm), SCTP_S_SHUTDOWN_RECEIVED); break;
case SCTP_E_CLOSE: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
default:;
}
break;
case SCTP_S_SHUTDOWN_PENDING:
switch (event)
{
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_NO_MORE_OUTSTANDING: FSM_Goto((*fsm), SCTP_S_SHUTDOWN_SENT); break;
case SCTP_E_RCV_SHUTDOWN: FSM_Goto((*fsm), SCTP_S_SHUTDOWN_RECEIVED); break;
case SCTP_E_RCV_SHUTDOWN_ACK: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
default:;
}
break;
case SCTP_S_SHUTDOWN_RECEIVED:
switch (event)
{
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_NO_MORE_OUTSTANDING:
FSM_Goto((*fsm), SCTP_S_SHUTDOWN_ACK_SENT);
break;
case SCTP_E_SHUTDOWN: sendShutdownAck(remoteAddr); /*FSM_Goto((*fsm), SCTP_S_SHUTDOWN_ACK_SENT);*/ break;
default:;
}
break;
case SCTP_S_SHUTDOWN_SENT:
switch (event)
{
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_SHUTDOWN_ACK: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_SHUTDOWN: sendShutdownAck(remoteAddr); FSM_Goto((*fsm), SCTP_S_SHUTDOWN_ACK_SENT); break;
default:;
}
break;
case SCTP_S_SHUTDOWN_ACK_SENT:
switch (event)
{
case SCTP_E_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_ABORT: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
case SCTP_E_RCV_SHUTDOWN_COMPLETE: FSM_Goto((*fsm), SCTP_S_CLOSED); break;
default:;
}
break;
}
if (oldState!=fsm->getState())
{
ev << "Transition: " << stateName(oldState) << " --> " << stateName(fsm->getState()) << " (event was: " << eventName(event) << ")\n";
sctpEV3 << sctpMain->getName() << ": " << stateName(oldState) << " --> " << stateName(fsm->getState()) << " (on " << eventName(event) << ")\n";
stateEntered(fsm->getState());
}
else
{
ev<< "Staying in state: " << stateName(fsm->getState()) << " (event was: " << eventName(event) << ")\n";
}
if (event==SCTP_E_ABORT && oldState==fsm->getState() && fsm->getState()==SCTP_S_CLOSED)
return true;
if (oldState!=fsm->getState() && fsm->getState()==SCTP_S_CLOSED)
{
sctpEV3<<"return false because oldState="<<oldState<<" and new state is closed\n";
return false;
}
else
return true;
}
| void SCTPAssociation::pmClearPathCounter | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by processHeartbeatAckArrived(), and processSackArrived().
{
path->pathErrorCount = 0;
if (path->activePath == false) {
/* notify the application */
pathStatusIndication(path, true);
sctpEV3 << "Path " << path->remoteAddress
<< " state changes from INACTIVE to ACTIVE !!!" << endl;
}
}
| void SCTPAssociation::pmDataIsSentOn | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by sendOnPath().
{
/* restart hb_timer on this path */
stopTimer(path->HeartbeatTimer);
if (state->enableHeartbeats)
{
path->heartbeatTimeout = path->pathRto + (double)sctpMain->par("hbInterval");
startTimer(path->HeartbeatTimer, path->heartbeatTimeout);
sctpEV3 << "Restarting HB timer on path "<< path->remoteAddress
<< " to expire at time " << path->heartbeatTimeout << endl;
}
path->cwndTimeout = path->pathRto;
stopTimer(path->CwndTimer);
startTimer(path->CwndTimer, path->cwndTimeout);
sctpEV3 << "Restarting CWND timer on path "<< path->remoteAddress
<< " to expire at time " << path->cwndTimeout << endl;
}
| void SCTPAssociation::pmRttMeasurement | ( | SCTPPathVariables * | path, |
| const simtime_t & | rttEstimation | ||
| ) | [protected] |
Referenced by processHeartbeatAckArrived(), and processSackArrived().
{
if (rttEstimation < MAXTIME) {
if (simTime() > path->updateTime) {
if (path->updateTime == SIMTIME_ZERO) {
path->rttvar = rttEstimation.dbl() / 2;
path->srtt = rttEstimation;
path->pathRto = 3.0 * rttEstimation.dbl();
path->pathRto = max(min(path->pathRto.dbl(), (double)sctpMain->par("rtoMax")),
(double)sctpMain->par("rtoMin"));
}
else {
path->rttvar = (1.0 - (double)sctpMain->par("rtoBeta")) * path->rttvar.dbl() +
(double)sctpMain->par("rtoBeta") * fabs(path->srtt.dbl() - rttEstimation.dbl());
path->srtt = (1.0 - (double)sctpMain->par("rtoAlpha")) * path->srtt.dbl() +
(double)sctpMain->par("rtoAlpha") * rttEstimation.dbl();
path->pathRto = path->srtt.dbl() + 4.0 * path->rttvar.dbl();
path->pathRto = max(min(path->pathRto.dbl(), (double)sctpMain->par("rtoMax")),
(double)sctpMain->par("rtoMin"));
}
/*
std::cout << simTime() << ": Updating timer values for path " << path->remoteAddress << ":"
<< " RTO=" << path->pathRto
<< " rttEstimation=" << rttEstimation
<< " SRTT=" << path->srtt
<< " --> RTTVAR=" << path->rttvar << endl;
*/
// RFC 2960, sect. 6.3.1: new RTT measurements SHOULD be made no more
// than once per round-trip.
path->updateTime = simTime() + path->srtt;
path->statisticsPathRTO->record(path->pathRto);
path->statisticsPathRTT->record(rttEstimation);
}
}
}
| void SCTPAssociation::pmStartPathManagement | ( | ) | [protected] |
Flow control
Referenced by stateEntered().
{
RoutingTableAccess routingTableAccess;
SCTPPathVariables* path;
int32 i=0;
/* populate path structures !!! */
/* set a high start value...this is appropriately decreased later (below) */
state->assocPmtu = state->localRwnd;
for(SCTPPathMap::iterator piter=sctpPathMap.begin(); piter!=sctpPathMap.end(); piter++)
{
path=piter->second;
path->pathErrorCount = 0;
InterfaceEntry *rtie = routingTableAccess.get()->getInterfaceForDestAddr(path->remoteAddress.get4());
path->pmtu = rtie->getMTU();
sctpEV3 << "Path MTU of Interface "<< i << " = " << path->pmtu <<"\n";
if (path->pmtu < state->assocPmtu)
{
state->assocPmtu = path->pmtu;
}
initCCParameters(path);
path->pathRto = (double)sctpMain->par("rtoInitial");
path->srtt = path->pathRto;
path->rttvar = SIMTIME_ZERO;
/* from now on we may have one update per RTO/SRTT */
path->updateTime = SIMTIME_ZERO;
path->partialBytesAcked = 0;
path->outstandingBytes = 0;
path->activePath = true;
// Timer probably not running, but stop it anyway I.R.
stopTimer(path->T3_RtxTimer);
if (path->remoteAddress == state->initialPrimaryPath && !path->confirmed) {
path->confirmed = true;
}
sctpEV3<<getFullPath()<<" numberOfLocalAddresses="<<state->localAddresses.size()<<"\n";
path->heartbeatTimeout= (double)sctpMain->par("hbInterval")+i*path->pathRto;
stopTimer(path->HeartbeatTimer);
sendHeartbeat(path);
startTimer(path->HeartbeatTimer, path->heartbeatTimeout);
startTimer(path->HeartbeatIntervalTimer, path->heartbeatIntervalTimeout);
path->statisticsPathRTO->record(path->pathRto);
i++;
}
}
| SCTPEventCode SCTPAssociation::preanalyseAppCommandEvent | ( | int32 | commandCode | ) | [protected] |
Maps app command codes (msg kind of app command msgs) to SCTP_E_xxx event codes
Referenced by processAppCommand().
{
switch (commandCode) {
case SCTP_C_ASSOCIATE: return SCTP_E_ASSOCIATE;
case SCTP_C_OPEN_PASSIVE: return SCTP_E_OPEN_PASSIVE;
case SCTP_C_SEND: return SCTP_E_SEND;
case SCTP_C_CLOSE: return SCTP_E_CLOSE;
case SCTP_C_ABORT: return SCTP_E_ABORT;
case SCTP_C_RECEIVE: return SCTP_E_RECEIVE;
case SCTP_C_SEND_UNORDERED: return SCTP_E_SEND;
case SCTP_C_SEND_ORDERED: return SCTP_E_SEND;
case SCTP_C_PRIMARY: return SCTP_E_PRIMARY;
case SCTP_C_QUEUE_MSGS_LIMIT: return SCTP_E_QUEUE_MSGS_LIMIT;
case SCTP_C_QUEUE_BYTES_LIMIT: return SCTP_E_QUEUE_BYTES_LIMIT;
case SCTP_C_SHUTDOWN: return SCTP_E_SHUTDOWN;
case SCTP_C_NO_OUTSTANDING: return SCTP_E_SEND_SHUTDOWN_ACK;
default:
sctpEV3<<"commandCode="<<commandCode<<"\n";
opp_error("Unknown message kind in app command");
return (SCTPEventCode)0; // to satisfy compiler
}
}
| void SCTPAssociation::printConnBrief | ( | ) | [protected] |
Utility: prints local/remote addr/port and app gate index/assocId
Referenced by processAppCommand(), and processSCTPMessage().
{
sctpEV3 << "Connection " << this << " ";
sctpEV3 << localAddr << ":" << localPort << " to " << remoteAddr << ":" << remotePort;
sctpEV3 << " on app[" << appGateIndex << "],assocId=" << assocId;
sctpEV3 << " in " << stateName(fsm->getState()) << "\n";
}
| void SCTPAssociation::printOutstandingTsns | ( | ) | [protected] |
| void SCTPAssociation::printSctpPathMap | ( | ) | const |
Referenced by processInitAckArrived(), processInitArrived(), sendInit(), and sendInitAck().
{
sctpEV3 <<"SCTP PathMap:" << endl;
for (SCTPPathMap::const_iterator iterator = sctpPathMap.begin();
iterator != sctpPathMap.end(); ++iterator) {
const SCTPPathVariables* path = iterator->second;
sctpEV3 << " - " << path->remoteAddress << ": osb=" << path->outstandingBytes
<< " cwnd=" << path->cwnd << endl;
}
}
| void SCTPAssociation::printSegmentBrief | ( | SCTPMessage * | sctpmsg | ) | [static, protected] |
| void SCTPAssociation::process_ABORT | ( | SCTPEventCode & | event | ) | [protected] |
Referenced by processAppCommand().
{
sctpEV3 << "SCTPAssociationEventProc:process_ABORT; assoc=" << assocId << endl;
switch(fsm->getState()) {
case SCTP_S_ESTABLISHED:
sendOnAllPaths(state->getPrimaryPath());
sendAbort();
break;
}
}
| void SCTPAssociation::process_ASSOCIATE | ( | SCTPEventCode & | event, |
| SCTPCommand * | sctpCommand, | ||
| cPacket * | msg | ||
| ) | [protected] |
Referenced by processAppCommand().
{
IPvXAddress lAddr, rAddr;
SCTPOpenCommand *openCmd = check_and_cast<SCTPOpenCommand *>(sctpCommand);
ev<<"SCTPAssociationEventProc:process_ASSOCIATE\n";
switch(fsm->getState())
{
case SCTP_S_CLOSED:
initAssociation(openCmd);
state->active = true;
localAddressList = openCmd->getLocalAddresses();
lAddr = openCmd->getLocalAddresses().front();
if (!(openCmd->getRemoteAddresses().empty()))
{
remoteAddressList = openCmd->getRemoteAddresses();
rAddr = openCmd->getRemoteAddresses().front();
}
else
rAddr = openCmd->getRemoteAddr();
localPort = openCmd->getLocalPort();
remotePort = openCmd->getRemotePort();
state->numRequests = openCmd->getNumRequests();
if (rAddr.isUnspecified() || remotePort==0)
opp_error("Error processing command OPEN_ACTIVE: remote address and port must be specified");
if (localPort==0)
{
localPort = sctpMain->getEphemeralPort();
}
ev << "OPEN: " << lAddr << ":" << localPort << " --> " << rAddr << ":" << remotePort << "\n";
sctpMain->updateSockPair(this, lAddr, rAddr, localPort, remotePort);
state->localRwnd = (long)sctpMain->par("arwnd");
sendInit();
startTimer(T1_InitTimer,state->initRexmitTimeout);
break;
default:
opp_error("Error processing command OPEN_ACTIVE: connection already exists");
}
}
| void SCTPAssociation::process_CLOSE | ( | SCTPEventCode & | event | ) | [protected] |
Referenced by processAppCommand().
{
sctpEV3 << "SCTPAssociationEventProc:process_CLOSE; assoc=" << assocId << endl;
switch(fsm->getState()) {
case SCTP_S_ESTABLISHED:
sendOnAllPaths(state->getPrimaryPath());
sendShutdown();
break;
case SCTP_S_SHUTDOWN_RECEIVED:
if (getOutstandingBytes() == 0) {
sendShutdownAck(remoteAddr);
}
break;
}
}
| void SCTPAssociation::process_OPEN_PASSIVE | ( | SCTPEventCode & | event, |
| SCTPCommand * | sctpCommand, | ||
| cPacket * | msg | ||
| ) | [protected] |
Referenced by processAppCommand().
{
IPvXAddress lAddr;
int16 localPort;
SCTPOpenCommand *openCmd = check_and_cast<SCTPOpenCommand *>(sctpCommand);
sctpEV3<<"SCTPAssociationEventProc:process_OPEN_PASSIVE\n";
switch(fsm->getState())
{
case SCTP_S_CLOSED:
initAssociation(openCmd);
state->fork = openCmd->getFork();
localAddressList = openCmd->getLocalAddresses();
sctpEV3<<"process_OPEN_PASSIVE: number of local addresses="<<localAddressList.size()<<"\n";
lAddr = openCmd->getLocalAddresses().front();
localPort = openCmd->getLocalPort();
inboundStreams = openCmd->getInboundStreams();
outboundStreams = openCmd->getOutboundStreams();
state->localRwnd = (long)sctpMain->par("arwnd");
state->numRequests = openCmd->getNumRequests();
state->messagesToPush = openCmd->getMessagesToPush();
if (localPort==0)
opp_error("Error processing command OPEN_PASSIVE: local port must be specified");
sctpEV3 << "Assoc "<<assocId<<"::Starting to listen on: " << lAddr << ":" << localPort << "\n";
sctpMain->updateSockPair(this, lAddr, IPvXAddress(), localPort, 0);
break;
default:
opp_error("Error processing command OPEN_PASSIVE: connection already exists");
}
}
| void SCTPAssociation::process_PRIMARY | ( | SCTPEventCode & | event, |
| SCTPCommand * | sctpCommand | ||
| ) | [protected] |
Referenced by processAppCommand().
{
SCTPPathInfo *pinfo = check_and_cast<SCTPPathInfo *>(sctpCommand);
state->setPrimaryPath(getPath(pinfo->getRemoteAddress()));
}
| void SCTPAssociation::process_QUEUE_BYTES_LIMIT | ( | const SCTPCommand * | sctpCommand | ) | [protected] |
Referenced by processAppCommand().
{
const SCTPInfo* qinfo = check_and_cast<const SCTPInfo*>(sctpCommand);
state->sendQueueLimit = qinfo->getText();
}
| void SCTPAssociation::process_QUEUE_MSGS_LIMIT | ( | const SCTPCommand * | sctpCommand | ) | [protected] |
Queue Management
Referenced by processAppCommand().
{
const SCTPInfo* qinfo = check_and_cast<const SCTPInfo*>(sctpCommand);
state->queueLimit = qinfo->getText();
sctpEV3<<"state->queueLimit set to "<<state->queueLimit<<"\n";
}
| bool SCTPAssociation::process_RCV_Message | ( | SCTPMessage * | sctpseg, |
| const IPvXAddress & | src, | ||
| const IPvXAddress & | dest | ||
| ) | [protected] |
Referenced by processSCTPMessage().
{
// ====== Header checks ==================================================
sctpEV3 << getFullPath() << " SCTPAssociationRcvMessage:process_RCV_Message"
<< " localAddr=" << localAddr
<< " remoteAddr=" << remoteAddr << endl;
if ((sctpmsg->hasBitError() || !sctpmsg->getChecksumOk()))
{
if (((SCTPChunk*)(sctpmsg->getChunks(0)))->getChunkType() == INIT_ACK) {
stopTimer(T1_InitTimer);
sctpEV3 << "InitAck with bit-error. Retransmit Init" << endl;
retransmitInit();
startTimer(T1_InitTimer,state->initRexmitTimeout);
}
if (((SCTPChunk*)(sctpmsg->getChunks(0)))->getChunkType() == COOKIE_ACK) {
stopTimer(T1_InitTimer);
sctpEV3 << "CookieAck with bit-error. Retransmit CookieEcho" << endl;
retransmitCookieEcho();
startTimer(T1_InitTimer,state->initRexmitTimeout);
}
}
SCTPPathVariables* path = getPath(src);
const uint16 srcPort = sctpmsg->getDestPort();
const uint16 destPort = sctpmsg->getSrcPort();
const uint32 numberOfChunks = sctpmsg->getChunksArraySize();
sctpEV3 << "numberOfChunks=" <<numberOfChunks << endl;
state->sctpmsg = (SCTPMessage*)sctpmsg->dup();
// ====== Handle chunks ==================================================
bool trans = true;
bool sendAllowed = false;
bool dupReceived = false;
bool dataChunkReceived = false;
bool dataChunkDelivered = false;
bool shutdownCalled = false;
for (uint32 i = 0; i < numberOfChunks; i++) {
const SCTPChunk* header = (const SCTPChunk*)(sctpmsg->removeChunk());
const uint8 type = header->getChunkType();
if ((type != INIT) &&
(type != ABORT) &&
(type != ERRORTYPE) &&
(sctpmsg->getTag() != peerVTag)) {
sctpEV3 << " VTag "<< sctpmsg->getTag() << " incorrect. Should be "
<< peerVTag << " localVTag=" << localVTag << endl;
return true;
}
switch (type) {
case INIT:
sctpEV3 << "SCTPAssociationRcvMessage: INIT received" << endl;
SCTPInitChunk* initChunk;
initChunk = check_and_cast<SCTPInitChunk*>(header);
if ((initChunk->getNoInStreams() != 0) &&
(initChunk->getNoOutStreams() != 0) &&
(initChunk->getInitTag() != 0)) {
trans = processInitArrived(initChunk, srcPort, destPort);
}
i = numberOfChunks-1;
delete initChunk;
break;
case INIT_ACK:
sctpEV3 << "SCTPAssociationRcvMessage: INIT_ACK received" << endl;
if (fsm->getState() == SCTP_S_COOKIE_WAIT) {
SCTPInitAckChunk* initAckChunk;
initAckChunk = check_and_cast<SCTPInitAckChunk*>(header);
if ((initAckChunk->getNoInStreams() != 0) &&
(initAckChunk->getNoOutStreams() != 0) &&
(initAckChunk->getInitTag() != 0)) {
trans = processInitAckArrived(initAckChunk);
}
else if (initAckChunk->getInitTag() == 0) {
sendAbort();
sctpMain->removeAssociation(this);
}
i = numberOfChunks-1;
delete initAckChunk;
}
else {
sctpEV3 << "INIT_ACK will be ignored" << endl;
}
break;
case COOKIE_ECHO:
sctpEV3 << "SCTPAssociationRcvMessage: COOKIE_ECHO received" << endl;
SCTPCookieEchoChunk* cookieEchoChunk;
cookieEchoChunk = check_and_cast<SCTPCookieEchoChunk*>(header);
trans = processCookieEchoArrived(cookieEchoChunk,src);
delete cookieEchoChunk;
break;
case COOKIE_ACK:
sctpEV3 << "SCTPAssociationRcvMessage: COOKIE_ACK received" << endl;
if (fsm->getState() == SCTP_S_COOKIE_ECHOED) {
SCTPCookieAckChunk* cookieAckChunk;
cookieAckChunk = check_and_cast<SCTPCookieAckChunk*>(header);
trans = processCookieAckArrived();
delete cookieAckChunk;
}
break;
case DATA:
sctpEV3 << "SCTPAssociationRcvMessage: DATA received" << endl;
if (fsm->getState() == SCTP_S_COOKIE_ECHOED) {
trans = performStateTransition(SCTP_E_RCV_COOKIE_ACK);
}
if (!(fsm->getState() == SCTP_S_SHUTDOWN_RECEIVED || fsm->getState() == SCTP_S_SHUTDOWN_ACK_SENT)) {
SCTPDataChunk* dataChunk;
dataChunk = check_and_cast<SCTPDataChunk*>(header);
if ((dataChunk->getByteLength()- 16) > 0) {
const SCTPEventCode event = processDataArrived(dataChunk);
if (event == SCTP_E_DELIVERED) {
dataChunkReceived = true;
dataChunkDelivered = true;
state->sackAllowed = true;
}
else if (event==SCTP_E_SEND || event==SCTP_E_IGNORE) {
dataChunkReceived = true;
state->sackAllowed = true;
}
else if (event==SCTP_E_DUP_RECEIVED) {
dupReceived = true;
}
}
else {
sendAbort();
sctpMain->removeAssociation(this);
}
delete dataChunk;
}
trans = true;
break;
case SACK:
{
sctpEV3 << "SCTPAssociationRcvMessage: SACK received" << endl;
const int32 scount = qCounter.roomSumSendStreams;
SCTPSackChunk* sackChunk;
sackChunk = check_and_cast<SCTPSackChunk*>(header);
processSackArrived(sackChunk);
trans = true;
sendAllowed = true;
delete sackChunk;
if (getOutstandingBytes()==0 && transmissionQ->getQueueSize()==0 && scount==0) {
if (fsm->getState() == SCTP_S_SHUTDOWN_PENDING) {
sctpEV3 << "No more packets: send SHUTDOWN" << endl;
sendShutdown();
trans = performStateTransition(SCTP_E_NO_MORE_OUTSTANDING);
shutdownCalled = true;
}
else if (fsm->getState() == SCTP_S_SHUTDOWN_RECEIVED) {
sctpEV3 << "No more outstanding" << endl;
sendShutdownAck(remoteAddr);
}
}
break;
}
case ABORT:
SCTPAbortChunk* abortChunk;
abortChunk = check_and_cast<SCTPAbortChunk*>(header);
sctpEV3 << "SCTPAssociationRcvMessage: ABORT with T-Bit "
<< abortChunk->getT_Bit() << " received" << endl;
if (sctpmsg->getTag() == localVTag || sctpmsg->getTag() == peerVTag) {
sendIndicationToApp(SCTP_I_ABORT);
trans = performStateTransition(SCTP_E_ABORT);
}
delete abortChunk;
break;
case HEARTBEAT:
sctpEV3 << "SCTPAssociationRcvMessage: HEARTBEAT received" << endl;
SCTPHeartbeatChunk* heartbeatChunk;
heartbeatChunk = check_and_cast<SCTPHeartbeatChunk*>(header);
if (!(fsm->getState() == SCTP_S_CLOSED)) {
sendHeartbeatAck(heartbeatChunk, dest, src);
}
trans = true;
delete heartbeatChunk;
if (path) {
path->numberOfHeartbeatsRcvd++;
path->pathRcvdHb->record(path->numberOfHeartbeatsRcvd);
}
break;
case HEARTBEAT_ACK:
sctpEV3 << "SCTPAssociationRcvMessage: HEARTBEAT_ACK received" << endl;
if (fsm->getState() == SCTP_S_COOKIE_ECHOED) {
trans = performStateTransition(SCTP_E_RCV_COOKIE_ACK);
}
SCTPHeartbeatAckChunk* heartbeatAckChunk;
heartbeatAckChunk = check_and_cast<SCTPHeartbeatAckChunk*>(header);
if (path) {
processHeartbeatAckArrived(heartbeatAckChunk, path);
}
trans = true;
delete heartbeatAckChunk;
break;
case SHUTDOWN:
sctpEV3 << "SCTPAssociationRcvMessage: SHUTDOWN received" << endl;
SCTPShutdownChunk* shutdownChunk;
shutdownChunk = check_and_cast<SCTPShutdownChunk*>(header);
if (shutdownChunk->getCumTsnAck()>state->lastTsnAck) {
simtime_t rttEstimation = MAXTIME;
dequeueAckedChunks(shutdownChunk->getCumTsnAck(),
getPath(remoteAddr), rttEstimation);
state->lastTsnAck = shutdownChunk->getCumTsnAck();
}
trans = performStateTransition(SCTP_E_RCV_SHUTDOWN);
sendIndicationToApp(SCTP_I_SHUTDOWN_RECEIVED);
trans = true;
delete shutdownChunk;
break;
case SHUTDOWN_ACK:
sctpEV3 << "SCTPAssociationRcvMessage: SHUTDOWN_ACK received" << endl;
if (fsm->getState()!=SCTP_S_ESTABLISHED) {
SCTPShutdownAckChunk* shutdownAckChunk;
shutdownAckChunk = check_and_cast<SCTPShutdownAckChunk*>(header);
sendShutdownComplete();
stopTimers();
stopTimer(T2_ShutdownTimer);
stopTimer(T5_ShutdownGuardTimer);
sctpEV3 << "state=" << stateName(fsm->getState()) << endl;
if ((fsm->getState() == SCTP_S_SHUTDOWN_SENT) ||
(fsm->getState() == SCTP_S_SHUTDOWN_ACK_SENT)) {
trans = performStateTransition(SCTP_E_RCV_SHUTDOWN_ACK);
sendIndicationToApp(SCTP_I_CLOSED);
delete state->shutdownChunk;
}
delete shutdownAckChunk;
}
break;
case SHUTDOWN_COMPLETE:
sctpEV3<<"Shutdown Complete arrived" << endl;
SCTPShutdownCompleteChunk* shutdownCompleteChunk;
shutdownCompleteChunk = check_and_cast<SCTPShutdownCompleteChunk*>(header);
trans = performStateTransition(SCTP_E_RCV_SHUTDOWN_COMPLETE);
sendIndicationToApp(SCTP_I_PEER_CLOSED); // necessary for NAT-Rendezvous
if (trans == true) {
stopTimers();
}
stopTimer(T2_ShutdownTimer);
stopTimer(T5_ShutdownGuardTimer);
delete state->shutdownAckChunk;
delete shutdownCompleteChunk;
break;
default: sctpEV3<<"different type" << endl;
break;
}
if (i==numberOfChunks-1 && (dataChunkReceived || dupReceived)) {
sendAllowed=true;
sctpEV3 << "i=" << i << " sendAllowed=true; scheduleSack" << endl;
scheduleSack();
if (fsm->getState() == SCTP_S_SHUTDOWN_SENT && state->ackState>=sackFrequency) {
sendSack();
}
}
// Send any new DATA chunks, SACK chunks, HEARTBEAT chunks etc.
sctpEV3 << "SCTPAssociationRcvMessage: send new data? state=" << stateName(fsm->getState())
<< " sendAllowed=" << sendAllowed
<< " shutdownCalled=" << shutdownCalled << endl;
if (((fsm->getState() == SCTP_S_ESTABLISHED) ||
(fsm->getState() == SCTP_S_SHUTDOWN_PENDING) ||
(fsm->getState() == SCTP_S_SHUTDOWN_RECEIVED)) &&
(sendAllowed) &&
(!shutdownCalled)) {
sendOnAllPaths(state->getPrimaryPath());
}
}
// ====== Clean-up =======================================================
disposeOf(state->sctpmsg);
return trans;
}
| void SCTPAssociation::process_RECEIVE_REQUEST | ( | SCTPEventCode & | event, |
| SCTPCommand * | sctpCommand | ||
| ) | [protected] |
Referenced by processAppCommand().
{
SCTPSendCommand *sendCommand = check_and_cast<SCTPSendCommand *>(sctpCommand);
if ((uint32)sendCommand->getSid() > inboundStreams || sendCommand->getSid() < 0)
{
sctpEV3<<"Application tries to read from invalid stream id....\n";
}
state->numMsgsReq[sendCommand->getSid()]+= sendCommand->getNumMsgs();
pushUlp();
}
| void SCTPAssociation::process_SEND | ( | SCTPEventCode & | event, |
| SCTPCommand * | sctpCommand, | ||
| cPacket * | msg | ||
| ) | [protected] |
Referenced by processAppCommand().
{
SCTPSendCommand* sendCommand = check_and_cast<SCTPSendCommand*>(sctpCommand);
if(fsm->getState() != SCTP_S_ESTABLISHED) {
// TD 12.03.2009: since SCTP_S_ESTABLISHED is the only case, the
// switch(...)-block has been removed for enhanced readability.
sctpEV3 << "process_SEND: state is not SCTP_S_ESTABLISHED -> returning" << endl;
return;
}
sctpEV3 << "process_SEND:"
<< " assocId=" << assocId
<< " localAddr=" << localAddr
<< " remoteAddr=" << remoteAddr
<< " cmdRemoteAddr="<< sendCommand->getRemoteAddr()
<< " cmdPrimary=" << (sendCommand->getPrimary() ? "true" : "false")
<< " appGateIndex=" << appGateIndex
<< " streamId=" << sendCommand->getSid() << endl;
SCTPSimpleMessage* smsg = check_and_cast<SCTPSimpleMessage*>((msg->decapsulate()));
SCTP::AssocStatMap::iterator iter = sctpMain->assocStatMap.find(assocId);
iter->second.sentBytes += smsg->getBitLength() / 8;
// ------ Prepare SCTPDataMsg -----------------------------------------
const uint32 streamId = sendCommand->getSid();
const uint32 sendUnordered = sendCommand->getSendUnordered();
const uint32 ppid = sendCommand->getPpid();
SCTPSendStream* stream = NULL;
SCTPSendStreamMap::iterator associter = sendStreams.find(streamId);
if (associter != sendStreams.end()) {
stream = associter->second;
}
else {
opp_error("stream with id %d not found", streamId);
}
char name[64];
snprintf(name, sizeof(name), "SDATA-%d-%d", streamId, state->msgNum);
smsg->setName(name);
SCTPDataMsg* datMsg = new SCTPDataMsg();
datMsg->encapsulate(smsg);
datMsg->setSid(streamId);
datMsg->setPpid(ppid);
datMsg->setEnqueuingTime(simulation.getSimTime());
// ------ Set initial destination address -----------------------------
if (sendCommand->getPrimary()) {
if (sendCommand->getRemoteAddr() == IPvXAddress("0.0.0.0")) {
datMsg->setInitialDestination(remoteAddr);
}
else {
datMsg->setInitialDestination(sendCommand->getRemoteAddr());
}
}
else {
datMsg->setInitialDestination(state->getPrimaryPathIndex());
}
// ------ Optional padding and size calculations ----------------------
datMsg->setBooksize(smsg->getBitLength() / 8 + state->header);
qCounter.roomSumSendStreams += ADD_PADDING(smsg->getBitLength() / 8 + SCTP_DATA_CHUNK_LENGTH);
qCounter.bookedSumSendStreams += datMsg->getBooksize();
state->sendBuffer += smsg->getByteLength();
datMsg->setMsgNum(++state->msgNum);
// ------ Ordered/Unordered modes -------------------------------------
if (sendUnordered == 1) {
datMsg->setOrdered(false);
stream->getUnorderedStreamQ()->insert(datMsg);
}
else {
datMsg->setOrdered(true);
stream->getStreamQ()->insert(datMsg);
if ((state->appSendAllowed) &&
(state->sendQueueLimit > 0) &&
((uint64)state->sendBuffer >= state->sendQueueLimit) ) {
sendIndicationToApp(SCTP_I_SENDQUEUE_FULL);
state->appSendAllowed = false;
}
sendQueue->record(stream->getStreamQ()->getLength());
}
state->queuedMessages++;
if ((state->queueLimit > 0) && (state->queuedMessages > state->queueLimit)) {
state->queueUpdate = false;
}
sctpEV3 << "process_SEND:"
<< " last=" << sendCommand->getLast()
<<" queueLimit=" << state->queueLimit << endl;
// ------ Call sendCommandInvoked() to send message -------------------
// sendCommandInvoked() itself will call sendOnAllPaths() ...
if (sendCommand->getLast() == true) {
if (sendCommand->getPrimary()) {
sctpAlgorithm->sendCommandInvoked(NULL);
}
else {
sctpAlgorithm->sendCommandInvoked(getPath(datMsg->getInitialDestination()));
}
}
}
| void SCTPAssociation::process_STATUS | ( | SCTPEventCode & | event, |
| SCTPCommand * | sctpCommand, | ||
| cPacket * | msg | ||
| ) | [protected] |
{
SCTPStatusInfo *statusInfo = new SCTPStatusInfo();
statusInfo->setState(fsm->getState());
statusInfo->setStateName(stateName(fsm->getState()));
statusInfo->setPathId(remoteAddr);
statusInfo->setActive(getPath(remoteAddr)->activePath);
msg->setControlInfo(statusInfo);
sendToApp(msg);
}
| void SCTPAssociation::process_TIMEOUT_HEARTBEAT | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by processTimer().
{
bool oldState;
/* check if error counters must be increased */
if (path->activePath)
{
state->errorCount++;
path->pathErrorCount++;
sctpEV3<<"HB timeout timer expired for path "<<path->remoteAddress<<" --> Increase Error Counters (Assoc: "<<state->errorCount<<", Path: "<<path->pathErrorCount<<")\n";
}
/* RTO must be doubled for this path ! */
path->pathRto = (simtime_t)min(2 * path->pathRto.dbl(), sctpMain->par("rtoMax"));
path->statisticsPathRTO->record(path->pathRto);
/* check if any thresholds are exceeded, and if so, check if ULP must be notified */
if (state->errorCount > (uint32)sctpMain->par("assocMaxRetrans"))
{
sendIndicationToApp(SCTP_I_CONN_LOST);
sendAbort();
sctpMain->removeAssociation(this);
return;
}
else
{
/* set path state to INACTIVE, if the path error counter is exceeded */
if (path->pathErrorCount > (uint32)sctpMain->par("pathMaxRetrans"))
{
oldState = path->activePath;
path->activePath = false;
if (path == state->getPrimaryPath()) {
state->setPrimaryPath(getNextPath(path));
}
sctpEV3 << "pathErrorCount now "<< path->pathErrorCount
<< "; PP now " << state->getPrimaryPathIndex() << endl;
}
/* then: we can check, if all paths are INACTIVE ! */
if (allPathsInactive())
{
sctpEV3<<"sctp_do_hb_to_timer() : ALL PATHS INACTIVE --> closing ASSOC\n";
sendIndicationToApp(SCTP_I_CONN_LOST);
return;
} else if (path->activePath == false && oldState == true)
{
/* notify the application, in case the PATH STATE has changed from ACTIVE to INACTIVE */
pathStatusIndication(path, false);
}
}
}
| void SCTPAssociation::process_TIMEOUT_HEARTBEAT_INTERVAL | ( | SCTPPathVariables * | path, |
| bool | force | ||
| ) | [protected] |
Referenced by processTimer().
{
sctpEV3<<"HB Interval timer expired -- sending new HB REQ on path "<<path->remoteAddress<<"\n";
/* restart hb_send_timer on this path */
stopTimer(path->HeartbeatIntervalTimer);
stopTimer(path->HeartbeatTimer);
path->heartbeatIntervalTimeout = (double)sctpMain->par("hbInterval") + path->pathRto;
path->heartbeatTimeout = path->pathRto;
startTimer(path->HeartbeatIntervalTimer, path->heartbeatIntervalTimeout);
if ((simTime() - path->lastAckTime > path->heartbeatIntervalTimeout/2) || path->forceHb)
{
sendHeartbeat(path);
startTimer(path->HeartbeatTimer, path->heartbeatTimeout);
path->forceHb = false;
}
}
| void SCTPAssociation::process_TIMEOUT_INIT_REXMIT | ( | SCTPEventCode & | event | ) | [protected] |
Referenced by processTimer().
{
if (++state->initRetransCounter > (int32)sctpMain->par("maxInitRetrans"))
{
sctpEV3 << "Retransmission count during connection setup exceeds " << (int32)sctpMain->par("maxInitRetrans") << ", giving up\n";
sendIndicationToApp(SCTP_I_CLOSED);
sendAbort();
sctpMain->removeAssociation(this);
return;
}
sctpEV3<< "Performing retransmission #" << state->initRetransCounter << "\n";
switch(fsm->getState())
{
case SCTP_S_COOKIE_WAIT: retransmitInit(); break;
case SCTP_S_COOKIE_ECHOED: retransmitCookieEcho(); break;
default: opp_error("Internal error: INIT-REXMIT timer expired while in state %s", stateName(fsm->getState()));
}
state->initRexmitTimeout *= 2;
if (state->initRexmitTimeout > SCTP_TIMEOUT_INIT_REXMIT_MAX)
state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT_MAX;
startTimer(T1_InitTimer,state->initRexmitTimeout);
}
| void SCTPAssociation::process_TIMEOUT_PROBING | ( | ) | [protected] |
| int32 SCTPAssociation::process_TIMEOUT_RTX | ( | SCTPPathVariables * | path | ) | [protected] |
Referenced by processTimer().
{
sctpEV3 << "Processing retransmission timeout ..." << endl;
// ====== Increase the RTO (by doubling it) ==============================
path->pathRto = min(2 * path->pathRto.dbl(), sctpMain->par("rtoMax"));
path->statisticsPathRTO->record(path->pathRto);
sctpEV3 << "Schedule T3 based retransmission for path "<< path->remoteAddress << endl;
// ====== Update congestion window =======================================
(this->*ccFunctions.ccUpdateAfterRtxTimeout)(path);
// ====== Error Counter Handling =========================================
if (!state->zeroWindowProbing) {
state->errorCount++;
path->pathErrorCount++;
sctpEV3 << "RTX-Timeout: errorCount increased to "<<path->pathErrorCount<<" state->errorCount="<<state->errorCount<<"\n";
}
if (state->errorCount >= (uint32)sctpMain->par("assocMaxRetrans")) {
/* error counter exceeded terminate the association -- create an SCTPC_EV_CLOSE event and send it to myself */
sctpEV3 << "process_TIMEOUT_RTX : ASSOC ERROR COUNTER EXCEEDED, closing ASSOC" << endl;
sendIndicationToApp(SCTP_I_CONN_LOST);
sendAbort();
sctpMain->removeAssociation(this);
return 0;
}
else {
if (path->pathErrorCount > (uint32)sctpMain->par("pathMaxRetrans")) {
bool notifyUlp = false;
sctpEV3 << "pathErrorCount exceeded\n";
if (path->activePath) {
/* tell the source */
notifyUlp = true;
}
path->activePath = false;
if (path->remoteAddress == state->getPrimaryPathIndex()) {
SCTPPathVariables* nextPath = getNextPath(path);
if (nextPath != NULL) {
state->setPrimaryPath(nextPath);
}
}
sctpEV3 << "process_TIMEOUT_RTX(" << path->remoteAddress
<< ") : PATH ERROR COUNTER EXCEEDED, path status is INACTIVE" << endl;
if (allPathsInactive()) {
sctpEV3 << "process_TIMEOUT_RTX: ALL PATHS INACTIVE --> connection LOST!" << endl;
sendIndicationToApp(SCTP_I_CONN_LOST);
sendAbort();
sctpMain->removeAssociation(this);
return 0;
}
else if (notifyUlp) {
// Send notification to the application
pathStatusIndication(path, false);
}
}
sctpEV3 << "process_TIMEOUT_RTX(" << path->remoteAddress
<< ") : PATH ERROR COUNTER now " << path->pathErrorCount << endl;
}
// ====== Do Retransmission ==============================================
// dequeue all chunks not acked so far and put them in the TransmissionQ
if (!retransmissionQ->payloadQueue.empty()) {
sctpEV3 << "Still " << retransmissionQ->payloadQueue.size()
<< " chunks in retransmissionQ" << endl;
for (SCTPQueue::PayloadQueue::iterator iterator = retransmissionQ->payloadQueue.begin();
iterator != retransmissionQ->payloadQueue.end(); iterator++) {
SCTPDataVariables* chunk = iterator->second;
assert(chunk != NULL);
// ====== Insert chunks into TransmissionQ ============================
// Only insert chunks that were sent to the path that has timed out
if ( ((chunkHasBeenAcked(chunk) == false && chunk->countsAsOutstanding) || chunk->hasBeenReneged) &&
(chunk->getLastDestinationPath() == path) ) {
sctpEV3 << simTime() << ": Timer-Based RTX for TSN "
<< chunk->tsn << " on path " << chunk->getLastDestination() << endl;
chunk->getLastDestinationPath()->numberOfTimerBasedRetransmissions++;
SCTP::AssocStatMap::iterator iter = sctpMain->assocStatMap.find(assocId);
iter->second.numT3Rtx++;
moveChunkToOtherPath(chunk, getNextDestination(chunk));
}
}
}
SCTPPathVariables* nextPath = getNextPath(path);
sctpEV3 << "TimeoutRTX: sendOnAllPaths()" << endl;
sendOnAllPaths(nextPath);
return 0;
}
| void SCTPAssociation::process_TIMEOUT_SHUTDOWN | ( | SCTPEventCode & | event | ) | [protected] |
Referenced by processTimer().
{
if (++state->errorCount > (uint32)sctpMain->par("assocMaxRetrans"))
{
sendIndicationToApp(SCTP_I_CONN_LOST);
sendAbort();
sctpMain->removeAssociation(this);
return;
}
sctpEV3 << "Performing shutdown retransmission. Assoc error count now "<<state->errorCount<<" \n";
if(fsm->getState() == SCTP_S_SHUTDOWN_SENT)
{
retransmitShutdown();
}
else if (fsm->getState() == SCTP_S_SHUTDOWN_ACK_SENT)
retransmitShutdownAck();
state->initRexmitTimeout *= 2;
if (state->initRexmitTimeout > SCTP_TIMEOUT_INIT_REXMIT_MAX)
state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT_MAX;
startTimer(T2_ShutdownTimer,state->initRexmitTimeout);
}
| bool SCTPAssociation::processAppCommand | ( | cPacket * | msg | ) |
Process commands from the application. Normally returns true. A return value of false means that the connection structure must be deleted by the caller (SCTP).
Referenced by SCTP::handleMessage().
{
printConnBrief();
// first do actions
SCTPCommand *sctpCommand = (SCTPCommand *)(msg->removeControlInfo());
SCTPEventCode event = preanalyseAppCommandEvent(msg->getKind());
sctpEV3 << "App command: " << eventName(event) << "\n";
switch (event)
{
case SCTP_E_ASSOCIATE: process_ASSOCIATE(event, sctpCommand, msg); break;
case SCTP_E_OPEN_PASSIVE: process_OPEN_PASSIVE(event, sctpCommand, msg); break;
case SCTP_E_SEND: process_SEND(event, sctpCommand, msg); break;
case SCTP_E_CLOSE: process_CLOSE(event); break;
case SCTP_E_ABORT: process_ABORT(event); break;
case SCTP_E_RECEIVE: process_RECEIVE_REQUEST(event, sctpCommand); break;
case SCTP_E_PRIMARY: process_PRIMARY(event, sctpCommand); break;
case SCTP_E_QUEUE_BYTES_LIMIT: process_QUEUE_BYTES_LIMIT(sctpCommand); break;
case SCTP_E_QUEUE_MSGS_LIMIT: process_QUEUE_MSGS_LIMIT(sctpCommand); break;
case SCTP_E_SHUTDOWN: /*sendShutdown*/
sctpEV3<<"SCTP_E_SHUTDOWN in state "<<stateName(fsm->getState())<<"\n";
if (fsm->getState()==SCTP_S_SHUTDOWN_RECEIVED) {
sctpEV3<<"send shutdown ack\n";
sendShutdownAck(remoteAddr);
}
break; //I.R.
case SCTP_E_STOP_SENDING: break;
case SCTP_E_SEND_SHUTDOWN_ACK:
/*if (fsm->getState()==SCTP_S_SHUTDOWN_RECEIVED && getOutstandingBytes()==0
&& qCounter.roomSumSendStreams==0 && transmissionQ->getQueueSize()==0)
{
sendShutdownAck(state->primaryPathIndex);
}*/
break;
default: opp_error("wrong event code");
}
delete sctpCommand;
// then state transitions
return performStateTransition(event);
}
| bool SCTPAssociation::processCookieAckArrived | ( | ) | [protected] |
Referenced by process_RCV_Message().
{
bool trans=false;
if (fsm->getState() == SCTP_S_COOKIE_ECHOED)
{
stopTimer(T1_InitTimer);
trans=performStateTransition(SCTP_E_RCV_COOKIE_ACK);
if (state->cookieChunk->getCookieArraySize()==0)
{
delete state->cookieChunk->getStateCookie();
}
delete state->cookieChunk;
return trans;
}
else
sctpEV3<<"State="<<fsm->getState()<<"\n";
return trans;
}
| bool SCTPAssociation::processCookieEchoArrived | ( | SCTPCookieEchoChunk * | cookieEcho, |
| IPvXAddress | addr | ||
| ) | [protected] |
Referenced by process_RCV_Message().
{
bool trans = false;
SCTPCookie* cookie = check_and_cast<SCTPCookie*>(cookieEcho->getStateCookie());
if (cookie->getCreationTime()+(int32)sctpMain->par("validCookieLifetime")<simTime())
{
sctpEV3<<"stale Cookie: sendAbort\n";
sendAbort();
delete cookie;
return trans;
}
if (fsm->getState() == SCTP_S_CLOSED)
{
if (cookie->getLocalTag()!=localVTag || cookie->getPeerTag() != peerVTag)
{
bool same=true;
for (int32 i=0; i<32; i++)
{
if (cookie->getLocalTieTag(i)!=state->localTieTag[i])
{
same = false;
break;
}
if (cookie->getPeerTieTag(i)!=state->peerTieTag[i])
{
same = false;
break;
}
}
if (!same)
{
sendAbort();
delete cookie;
return trans;
}
}
sctpEV3<<"State is CLOSED, Cookie_Ack has to be sent\n";
trans=performStateTransition(SCTP_E_RCV_VALID_COOKIE_ECHO);
if (trans)
sendCookieAck(addr);//send to address
}
else if (fsm->getState() == SCTP_S_ESTABLISHED || fsm->getState() == SCTP_S_COOKIE_WAIT || fsm->getState() == SCTP_S_COOKIE_ECHOED)
{
sctpEV3<<"State is not CLOSED, but COOKIE_ECHO received. Compare the Tags\n";
// case A: Peer restarted, retrieve information from cookie
if (cookie->getLocalTag()!=localVTag && cookie->getPeerTag() != peerVTag )
{
bool same=true;
for (int32 i=0; i<32; i++)
{
if (cookie->getLocalTieTag(i)!=state->localTieTag[i])
{
same = false;
break;
}
if (cookie->getPeerTieTag(i)!=state->peerTieTag[i])
{
same = false;
break;
}
}
if (same)
{
localVTag = cookie->getLocalTag();
peerVTag = cookie->getPeerTag();
sendCookieAck(addr);
}
}
// case B: Setup collision, retrieve information from cookie
else if (cookie->getPeerTag()==peerVTag && (cookie->getLocalTag()!=localVTag || cookie->getLocalTag()==0))
{
localVTag = cookie->getLocalTag();
peerVTag = cookie->getPeerTag();
performStateTransition(SCTP_E_RCV_VALID_COOKIE_ECHO);
sendCookieAck(addr);
}
else if (cookie->getPeerTag()==peerVTag && cookie->getLocalTag()==localVTag)
{
sendCookieAck(addr); //send to address src
}
trans=true;
}
else
{
sctpEV3<<"State="<<fsm->getState()<<"\n";
trans = true;
}
delete cookie;
return trans;
}
| SCTPEventCode SCTPAssociation::processDataArrived | ( | SCTPDataChunk * | dataChunk | ) | [protected] |
Referenced by process_RCV_Message().
{
bool checkCtsnaChange = false;
const uint32 tsn = dataChunk->getTsn();
SCTPPathVariables* path = getPath(remoteAddr);
state->newChunkReceived = false;
state->lastTsnReceived = tsn;
state->lastDataSourceAddress = remoteAddr;
sctpEV3 << simTime() << " SCTPAssociation::processDataArrived TSN=" << tsn << endl;
path->pathRcvdTSN->record(tsn);
SCTPSimpleMessage* smsg = check_and_cast <SCTPSimpleMessage*>(dataChunk->decapsulate());
dataChunk->setBitLength(SCTP_DATA_CHUNK_LENGTH*8);
dataChunk->encapsulate(smsg);
const uint32 payloadLength = dataChunk->getBitLength()/8-SCTP_DATA_CHUNK_LENGTH;
sctpEV3 << "state->bytesRcvd=" << state->bytesRcvd << endl;
state->bytesRcvd+=payloadLength;
sctpEV3 << "state->bytesRcvd now=" << state->bytesRcvd << endl;
SCTP::AssocStatMap::iterator iter=sctpMain->assocStatMap.find(assocId);
iter->second.rcvdBytes+=dataChunk->getBitLength()/8-SCTP_DATA_CHUNK_LENGTH;
if (state->numGaps == 0) {
state->highestTsnReceived = state->cTsnAck;
}
else {
state->highestTsnReceived = state->gapStopList[state->numGaps-1];
}
if (state->stopReceiving) {
return SCTP_E_IGNORE;
}
if (tsnLe(tsn, state->cTsnAck)) {
sctpEV3 << "Duplicate TSN " << tsn << " (smaller than CumAck)" << endl;
state->dupList.push_back(tsn);
state->dupList.unique();
delete check_and_cast <SCTPSimpleMessage*>(dataChunk->decapsulate());
return SCTP_E_DUP_RECEIVED;
}
if ((int32)(state->localRwnd-state->queuedReceivedBytes) <= 0) {
if (tsnGt(tsn, state->highestTsnReceived)) {
return SCTP_E_IGNORE;
}
// changed 06.07.05 I.R.
else if ((!tsnIsDuplicate(tsn)) &&
(tsn < state->highestTsnStored)) {
if (!makeRoomForTsn(tsn, dataChunk->getBitLength()-SCTP_DATA_CHUNK_LENGTH*8, dataChunk->getUBit())) {
delete check_and_cast <SCTPSimpleMessage*>(dataChunk->decapsulate());
return SCTP_E_IGNORE;
}
}
}
if (tsnGt(tsn, state->highestTsnReceived)) {
sctpEV3 << "highestTsnReceived=" << state->highestTsnReceived
<< " tsn=" << tsn << endl;
state->highestTsnReceived = state->highestTsnStored = tsn;
if (tsn == initPeerTsn) {
state->cTsnAck = tsn;
}
else {
sctpEV3 << "Update fragment list" << endl;
checkCtsnaChange = updateGapList(tsn);
}
state->newChunkReceived = true;
}
else if (tsnIsDuplicate(tsn)) {
// TSN value is duplicate within a fragment
sctpEV3 << "Duplicate TSN " << tsn << " (copy)" << endl;
state->dupList.push_back(tsn);
state->dupList.unique();
return SCTP_E_IGNORE;
}
else {
checkCtsnaChange = updateGapList(tsn);
}
if (state->swsAvoidanceInvoked) {
// swsAvoidanceInvoked => schedule a SACK to be sent at once in this case
sctpEV3 << "swsAvoidanceInvoked" << endl;
state->ackState = sackFrequency;
}
if (checkCtsnaChange) {
advanceCtsna();
}
sctpEV3 << "cTsnAck=" << state->cTsnAck
<< " highestTsnReceived=" << state->highestTsnReceived << endl;
SCTPEventCode event = SCTP_E_SEND;
if (state->newChunkReceived) {
SCTPReceiveStreamMap::iterator iter = receiveStreams.find(dataChunk->getSid());
const int ret = iter->second->enqueueNewDataChunk(makeVarFromMsg(dataChunk));
if (ret > 0) {
state->queuedReceivedBytes+=payloadLength;
event = SCTP_E_DELIVERED;
if (ret < 3) {
sendDataArrivedNotification(dataChunk->getSid());
putInDeliveryQ(dataChunk->getSid());
}
}
state->newChunkReceived = false;
}
return(event);
}
| SCTPEventCode SCTPAssociation::processHeartbeatAckArrived | ( | SCTPHeartbeatAckChunk * | heartbeatack, |
| SCTPPathVariables * | path | ||
| ) | [protected] |
Referenced by process_RCV_Message().
{
path->numberOfHeartbeatAcksRcvd++;
path->pathRcvdHbAck->record(path->numberOfHeartbeatAcksRcvd);
/* hb-ack goes to pathmanagement, reset error counters, stop timeout timer */
const IPvXAddress addr = hback->getRemoteAddr();
const simtime_t hbTimeField = hback->getTimeField();
stopTimer(path->HeartbeatTimer);
/* assume a valid RTT measurement on this path */
simtime_t rttEstimation = simTime() - hbTimeField;
pmRttMeasurement(path, rttEstimation);
pmClearPathCounter(path);
path->confirmed = true;
path->lastAckTime = simTime();
if (path->primaryPathCandidate == true) {
state->setPrimaryPath(getPath(addr));
path->primaryPathCandidate = false;
if (path->pmtu < state->assocPmtu) {
state->assocPmtu = path->pmtu;
}
path->ssthresh = state->peerRwnd;
recordCwndUpdate(path);
path->heartbeatTimeout = (double)sctpMain->par("hbInterval") + path->pathRto;
}
if (path->activePath == false) {
sctpEV3 << "HB ACK arrived activePath=false. remoteAddress=" <<path->remoteAddress
<< " initialPP=" << state->initialPrimaryPath << endl;
path->activePath = true;
if (state->reactivatePrimaryPath && path->remoteAddress == state->initialPrimaryPath) {
state->setPrimaryPath(path);
}
sctpEV3 << "primaryPath now " << state->getPrimaryPathIndex() << endl;
}
sctpEV3 << "Received HB ACK chunk...resetting error counters on path " << addr
<<", rttEstimation=" << rttEstimation << endl;
path->pathErrorCount = 0;
return SCTP_E_IGNORE;
}
| bool SCTPAssociation::processInitAckArrived | ( | SCTPInitAckChunk * | initAckChunk | ) | [protected] |
Referenced by process_RCV_Message().
{
bool trans = false;
if (fsm->getState() == SCTP_S_COOKIE_WAIT)
{
sctpEV3<<"State is COOKIE_WAIT, Cookie_Echo has to be sent\n";
stopTimer(T1_InitTimer);
state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT;
trans=performStateTransition(SCTP_E_RCV_INIT_ACK);
//delete state->initChunk; will be deleted when state ESTABLISHED is entered
if (trans)
{
state->initialPrimaryPath = remoteAddr;
state->setPrimaryPath(getPath(remoteAddr));
initPeerTsn=initAckChunk->getInitTSN();
localVTag= initAckChunk->getInitTag();
state->cTsnAck = initPeerTsn - 1;
state->initialPeerRwnd = initAckChunk->getA_rwnd();
state->peerRwnd = state->initialPeerRwnd;
remoteAddressList.clear();
numberOfRemoteAddresses =initAckChunk->getAddressesArraySize();
sctpEV3<<"number of remote addresses in initAck="<<numberOfRemoteAddresses<<"\n";
for (uint32 j=0; j<numberOfRemoteAddresses; j++)
{
if (initAckChunk->getAddresses(j).isIPv6())
continue;
for (AddressVector::iterator k=state->localAddresses.begin(); k!=state->localAddresses.end(); ++k)
{
if (!((*k).isUnspecified()))
{
sctpEV3<<"addPath "<<initAckChunk->getAddresses(j)<<"\n";
sctpMain->addRemoteAddress(this,(*k), initAckChunk->getAddresses(j));
this->remoteAddressList.push_back(initAckChunk->getAddresses(j));
addPath(initAckChunk->getAddresses(j));
}
}
}
SCTPPathMap::iterator ite=sctpPathMap.find(remoteAddr);
if (ite==sctpPathMap.end())
{
sctpEV3<<__LINE__<<" get new path for "<<remoteAddr<<"\n";
SCTPPathVariables* path = new SCTPPathVariables(remoteAddr, this);
sctpPathMap[remoteAddr] = path;
qCounter.roomTransQ[remoteAddr] = 0;
qCounter.roomRetransQ[remoteAddr] = 0;
qCounter.bookedTransQ[remoteAddr] = 0;
}
inboundStreams = ((initAckChunk->getNoOutStreams()<inboundStreams)?initAckChunk->getNoOutStreams():inboundStreams);
outboundStreams = ((initAckChunk->getNoInStreams()<outboundStreams)?initAckChunk->getNoInStreams():outboundStreams);
(this->*ssFunctions.ssInitStreams)(inboundStreams, outboundStreams);
sendCookieEcho(initAckChunk);
}
startTimer(T1_InitTimer, state->initRexmitTimeout);
}
else
sctpEV3<<"State="<<fsm->getState()<<"\n";
printSctpPathMap();
return trans;
}
| bool SCTPAssociation::processInitArrived | ( | SCTPInitChunk * | initChunk, |
| int32 | sport, | ||
| int32 | dport | ||
| ) | [protected] |
Process incoming SCTP packets. Invoked from process_RCV_Message
Referenced by process_RCV_Message().
{
SCTPAssociation* assoc;
char timerName[128];
bool trans = false;
InterfaceTableAccess interfaceTableAccess;
AddressVector adv;
sctpEV3<<"processInitArrived\n";
if (fsm->getState() == SCTP_S_CLOSED)
{
sctpEV3<<"fork="<<state->fork<<" initReceived="<<state->initReceived<<"\n";
if (state->fork && !state->initReceived)
{
sctpEV3<<"cloneAssociation\n";
assoc = cloneAssociation();
sctpEV3<<"addForkedAssociation\n";
sctpMain->addForkedAssociation(this, assoc, localAddr, remoteAddr, srcPort, destPort);
sctpEV3 << "Connection forked: this connection got new assocId=" << assocId << ", "
"spinoff keeps LISTENing with assocId=" << assoc->assocId << "\n";
snprintf(timerName, sizeof(timerName), "T2_SHUTDOWN of assoc %d", assocId);
T2_ShutdownTimer->setName(timerName);
snprintf(timerName, sizeof(timerName), "T5_SHUTDOWN_GUARD of assoc %d", assocId);
T5_ShutdownGuardTimer->setName(timerName);
snprintf(timerName, sizeof(timerName), "SACK_TIMER of assoc %d", assocId);
SackTimer->setName(timerName);
snprintf(timerName, sizeof(timerName), "T1_INIT of assoc %d", assocId);
T1_InitTimer->setName(timerName);
}
else
{
sctpMain->updateSockPair(this, localAddr, remoteAddr, srcPort, destPort);
}
if (!state->initReceived)
{
state->initReceived = true;
state->initialPrimaryPath = remoteAddr;
state->setPrimaryPath(getPath(remoteAddr));
if (initchunk->getAddressesArraySize()==0)
{
sctpEV3<<__LINE__<<" get new path for "<<remoteAddr<<"\n";
SCTPPathVariables* rPath = new SCTPPathVariables(remoteAddr, this);
sctpPathMap[rPath->remoteAddress] = rPath;
qCounter.roomTransQ[rPath->remoteAddress] = 0;
qCounter.bookedTransQ[rPath->remoteAddress] = 0;
qCounter.roomRetransQ[rPath->remoteAddress] = 0;
}
initPeerTsn=initchunk->getInitTSN();
state->cTsnAck = initPeerTsn - 1;
state->initialPeerRwnd = initchunk->getA_rwnd();
state->peerRwnd = state->initialPeerRwnd;
localVTag= initchunk->getInitTag();
numberOfRemoteAddresses =initchunk->getAddressesArraySize();
IInterfaceTable *ift = interfaceTableAccess.get();
state->localAddresses.clear();
if (localAddressList.front() == IPvXAddress("0.0.0.0"))
{
for (int32 i=0; i<ift->getNumInterfaces(); ++i)
{
if (ift->getInterface(i)->ipv4Data()!=NULL)
{
adv.push_back(ift->getInterface(i)->ipv4Data()->getIPAddress());
}
else if (ift->getInterface(i)->ipv6Data()!=NULL)
{
adv.push_back(ift->getInterface(i)->ipv6Data()->getAddress(0));
}
}
}
else
{
adv = localAddressList;
}
uint32 rlevel = getLevel(remoteAddr);
if (rlevel>0)
for (AddressVector::iterator i=adv.begin(); i!=adv.end(); ++i)
{
if (getLevel((*i))>=rlevel)
{
sctpMain->addLocalAddress(this, (*i));
state->localAddresses.push_back((*i));
}
}
for (uint32 j=0; j<initchunk->getAddressesArraySize(); j++)
{
// skip IPv6 because we can't send to them yet
if (initchunk->getAddresses(j).isIPv6())
continue;
// set path variables for this pathlocalAddresses
if (!getPath(initchunk->getAddresses(j)))
{
SCTPPathVariables* path = new SCTPPathVariables(initchunk->getAddresses(j), this);
sctpEV3<<__LINE__<<" get new path for "<<initchunk->getAddresses(j)<<" ptr="<<path<<"\n";
for (AddressVector::iterator k=state->localAddresses.begin(); k!=state->localAddresses.end(); ++k)
{
sctpMain->addRemoteAddress(this,(*k), initchunk->getAddresses(j));
this->remoteAddressList.push_back(initchunk->getAddresses(j));
}
sctpPathMap[path->remoteAddress] = path;
qCounter.roomTransQ[path->remoteAddress] = 0;
qCounter.bookedTransQ[path->remoteAddress] = 0;
qCounter.roomRetransQ[path->remoteAddress] = 0;
}
}
SCTPPathMap::iterator ite=sctpPathMap.find(remoteAddr);
if (ite==sctpPathMap.end())
{
SCTPPathVariables* path = new SCTPPathVariables(remoteAddr, this);
sctpEV3<<__LINE__<<" get new path for "<<remoteAddr<<" ptr="<<path<<"\n";
sctpPathMap[remoteAddr] = path;
qCounter.roomTransQ[remoteAddr] = 0;
qCounter.bookedTransQ[remoteAddr] = 0;
qCounter.roomRetransQ[remoteAddr] = 0;
}
trans = performStateTransition(SCTP_E_RCV_INIT);
if (trans) {
sendInitAck(initchunk);
}
}
else if (fsm->getState() == SCTP_S_CLOSED)
{
trans=performStateTransition(SCTP_E_RCV_INIT);
if (trans) {
sendInitAck(initchunk);
}
}
else {
trans = true;
}
}
else if (fsm->getState() == SCTP_S_COOKIE_WAIT) //INIT-Collision
{
sctpEV3<<"INIT collision: send Init-Ack\n";
sendInitAck(initchunk);
trans=true;
}
else if (fsm->getState() == SCTP_S_COOKIE_ECHOED || fsm->getState() == SCTP_S_ESTABLISHED)
{
// check, whether a new address has been added
bool addressPresent = false;
for (uint32 j=0; j<initchunk->getAddressesArraySize(); j++)
{
if (initchunk->getAddresses(j).isIPv6())
continue;
for (AddressVector::iterator k=remoteAddressList.begin(); k!=remoteAddressList.end(); ++k)
{
if ((*k)==(initchunk->getAddresses(j)))
{
addressPresent = true;
break;
}
}
if (!addressPresent)
{
sendAbort();
return true;
}
}
sendInitAck(initchunk);
trans=true;
}
else if (fsm->getState() == SCTP_S_SHUTDOWN_ACK_SENT)
trans = true;
printSctpPathMap();
return trans;
}
| SCTPEventCode SCTPAssociation::processSackArrived | ( | SCTPSackChunk * | sackChunk | ) | [protected] |
Referenced by process_RCV_Message().
{
simtime_t rttEstimation = MAXTIME;
bool ctsnaAdvanced = false;
SCTPPathVariables* path = getPath(remoteAddr); // Path for *this* SACK!
const uint64 arwnd = sackChunk->getA_rwnd();
const uint32 tsna = sackChunk->getCumTsnAck();
uint32 highestNewAck = tsna; // Highest newly acked TSN
const uint16 numGaps = sackChunk->getNumGaps();
bool getChunkFastFirstTime = true;
// ====== Print some information =========================================
sctpEV3 << "##### SACK Processing: TSNa=" << tsna << " #####" << endl;
for(SCTPPathMap::iterator piter = sctpPathMap.begin(); piter != sctpPathMap.end(); piter++) {
SCTPPathVariables* myPath = piter->second;
sctpEV3 << "Path " << myPath->remoteAddress << ":\t"
<< "outstanding=" << path->outstandingBytes << "\t"
<< "T3scheduled=" << path->T3_RtxTimer->getArrivalTime() << " "
<< (path->T3_RtxTimer->isScheduled() ? "[ok]" : "[NOT SCHEDULED]") << "\t"
<< endl;
}
// ====== Initialize some variables ======================================
for(SCTPPathMap::iterator piter = sctpPathMap.begin(); piter != sctpPathMap.end(); piter++) {
SCTPPathVariables* myPath = piter->second;
// T.D. 26.03.09: Remember outstanding bytes before this update
// Values are necessary for updating the congestion window!
myPath->outstandingBytesBeforeUpdate = myPath->outstandingBytes; // T.D. 14.11.09: Bugfix - copy from myPath, not from path!
myPath->requiresRtx = false;
myPath->lowestTSNRetransmitted = false;
myPath->findLowestTSN = true;
myPath->gapAcksInLastSACK = 0;
myPath->gapNAcksInLastSACK = 0;
myPath->newlyAckedBytes = 0;
if (myPath == path) {
myPath->lastAckTime = simTime();
}
}
// ====== Zero Window Probing ============================================
if ( (state->zeroWindowProbing) && (arwnd > 0) ) {
state->zeroWindowProbing = false;
}
// #######################################################################
// #### Processing of CumAck ####
// #######################################################################
if (tsnGt(tsna, state->lastTsnAck)) { // Handle new CumAck
sctpEV3 << "===== Handling new CumAck for TSN " << tsna << " =====" << endl;
// We have got new chunks acked, and our cum ack point is advanced ...
// Depending on the parameter osbWithHeader ackedBytes are with or without the header bytes.
// T.D. 23.03.09: path->newlyAckedBytes is updated in dequeueAckedChunks()!
dequeueAckedChunks(tsna, path, rttEstimation); // chunks with tsn between lastTsnAck and tsna are removed from the transmissionQ and the retransmissionQ; outstandingBytes are decreased
state->lastTsnAck = tsna;
ctsnaAdvanced = true;
}
else if (tsnLt(tsna, state->lastTsnAck)) {
sctpEV3 << "Stale CumAck (" << tsna << " < " << state->lastTsnAck << ")"
<< endl;
return SCTP_E_IGNORE;
}
// ====== Handle reneging ================================================
if ((numGaps == 0) && (tsnLt(tsna, state->highestTsnAcked))) {
// Reneging, type 0:
// In a previous SACK, chunks up to highestTsnAcked have been acked.
// This SACK contains a CumAck < highestTsnAcked
// => rereg TSNs from CumAck+1 to highestTsnAcked
// => new highestTsnAcked = CumAck
sctpEV3 << "numGaps=0 && tsna " << tsna
<< " < highestTsnAcked " << state->highestTsnAcked << endl;
uint32 i = state->highestTsnAcked;
while (i >= tsna + 1) {
SCTPDataVariables* myChunk = retransmissionQ->getChunk(i);
if(myChunk) {
if(chunkHasBeenAcked(myChunk)) {
tsnWasReneged(myChunk,
0);
}
}
i--;
}
state->highestTsnAcked = tsna;
}
// #######################################################################
// #### Processing of GapAcks ####
// #######################################################################
if ((numGaps > 0) && (!retransmissionQ->payloadQueue.empty()) ) {
sctpEV3 << "===== Handling GapAcks after CumTSNAck " << tsna << " =====" << endl;
sctpEV3 << "We got " << numGaps << " gap reports" << endl;
// We got fragment reports... check for newly acked chunks.
const uint32 queuedChunks = retransmissionQ->payloadQueue.size();
sctpEV3 << "Number of chunks in retransmissionQ: " << queuedChunks
<<" highestGapStop: " << sackChunk->getGapStop(numGaps-1)
<<" highestTsnAcked: " << state->highestTsnAcked << endl;
// ====== Handle reneging =============================================
// highest gapStop smaller than highestTsnAcked: there might have been reneging
if (tsnLt(sackChunk->getGapStop(numGaps-1), state->highestTsnAcked)) {
// Reneging, type 2:
// In a previous SACK, chunks up to highestTsnAcked have been acked.
// This SACK contains a last gap ack < highestTsnAcked
// => rereg TSNs from last gap ack to highestTsnAcked
// => new highestTsnAcked = last gap ack
uint32 i = state->highestTsnAcked;
while (i >= sackChunk->getGapStop(numGaps - 1) + 1) {
// ====== Looking up TSN in retransmission queue ================
SCTPDataVariables* myChunk = retransmissionQ->getChunk(i);
if(myChunk) {
if (chunkHasBeenAcked(myChunk)) {
sctpEV3 << "TSN " << i << " was found. It has been un-acked." << endl;
tsnWasReneged(myChunk,
2);
sctpEV3 << "highestTsnAcked now " << state->highestTsnAcked << endl;
}
}
else {
sctpEV3 << "TSN " << i << " not found in retransmissionQ" << endl;
}
i--;
}
state->highestTsnAcked = sackChunk->getGapStop(numGaps - 1);
}
// ====== Looking for changes in the gap reports ======================
sctpEV3 << "Looking for changes in gap reports" << endl;
for (int32 key = 0;key < numGaps; key++) {
const uint32 lo = sackChunk->getGapStart(key);
const uint32 hi = sackChunk->getGapStop(key);
// ====== Iterate over TSNs in gap reports =========================
sctpEV3 << "Examine TSNs between " << lo << " and " << hi << endl;
for (uint32 pos = lo; pos <= hi; pos++) {
SCTPDataVariables* myChunk = retransmissionQ->getChunkFast(pos, getChunkFastFirstTime);
if (myChunk) {
if(chunkHasBeenAcked(myChunk) == false) {
SCTPPathVariables* myChunkLastPath = myChunk->getLastDestinationPath();
assert(myChunkLastPath != NULL);
// T.D. 02.02.2010: This chunk has been acked newly.
// Let's process this new acknowledgement!
handleChunkReportedAsAcked(highestNewAck, rttEstimation, myChunk,
path /* i.e. the SACK path for RTT measurement! */);
}
}
}
}
state->highestTsnAcked = sackChunk->getGapStop(numGaps-1);
// ====== Examine chunks between the gap reports ======================
// They might have to be retransmitted or they could have been removed
uint32 lo = tsna;
for (int32 key = 0; key < numGaps; key++) {
const uint32 hi = sackChunk->getGapStart(key);
for (uint32 pos = lo+1; pos <= hi - 1; pos++) {
SCTPDataVariables* myChunk = retransmissionQ->getChunkFast(pos, getChunkFastFirstTime);
if(myChunk) {
handleChunkReportedAsMissing(sackChunk, highestNewAck, myChunk,
path /* i.e. the SACK path for RTT measurement! */);
}
else {
sctpEV3 << "TSN " << pos << " not found in retransmissionQ" << endl;
}
}
lo = sackChunk->getGapStop(key);
}
// ====== Validity checks =============================================
}
// ====== Update Fast Recovery status, according to SACK =================
updateFastRecoveryStatus(state->lastTsnAck);
// ====== Update RTT measurement for newly acked data chunks =============
sctpEV3 << simTime() << ": SACK: rtt=" << rttEstimation
<< ", path=" << path->remoteAddress << endl;
pmRttMeasurement(path, rttEstimation);
// #######################################################################
// #### Receiver Window Management ####
// #######################################################################
const uint32 osb = getOutstandingBytes();
state->peerRwnd = arwnd - osb;
// position of statement changed 20.07.05 I.R.
if ((int32)(state->peerRwnd) < 0) {
state->peerRwnd = 0;
}
if (state->peerRwnd > state->initialPeerRwnd) {
state->peerRwnd = state->initialPeerRwnd;
}
if (arwnd == 1 || state->peerRwnd < state->swsLimit || arwnd == 0) {
sctpEV3 << "processSackArrived: arwnd=" << arwnd
<< " state->peerRwnd=" << state->peerRwnd
<< " set peerWindowFull" << endl;
state->peerWindowFull = true;
}
else
{
state->peerWindowFull = false;
state->zeroWindowProbing = false;
}
#ifdef PVIATE
advancePeerTsn();
#endif
// ====== Need for zero-window probing? ==================================
if(osb == 0) {
if (arwnd == 0)
state->zeroWindowProbing = true;
}
// #######################################################################
// #### Congestion Window Management ####
// #######################################################################
sctpEV3 << "Before ccUpdateBytesAcked: ";
for(SCTPPathMap::iterator piter = sctpPathMap.begin(); piter != sctpPathMap.end(); piter++) {
SCTPPathVariables* myPath = piter->second;
const IPvXAddress& myPathId = myPath->remoteAddress;
if(myPath->newlyAckedBytes > 0) {
// T.D. 07.10.2009: Only call ccUpdateBytesAcked() when there are
// acked bytes on this path!
const bool advanceWindow = myPath->newCumAck;
sctpEV3 << simTime() << ":\tCC " << myPath->newlyAckedBytes
<< " newly acked on path " << myPathId << ";"
<< "\t->\tadvanceWindow=" << advanceWindow << endl;
(this->*ccFunctions.ccUpdateBytesAcked)(myPath, myPath->newlyAckedBytes, advanceWindow);
}
}
// ====== Update congestion windows on paths (handling decreases) ========
sctpEV3 << "Before ccUpdateAfterSack with tsna=" << tsna << endl;
// ccUpdateAfterSack() will iterate over all paths.
(this->*ccFunctions.ccUpdateAfterSack)();
// #######################################################################
// #### Path Management ####
// #######################################################################
// ====== Need to stop or restart T3 timer? ==============================
for(SCTPPathMap::iterator piter = sctpPathMap.begin(); piter != sctpPathMap.end(); piter++) {
SCTPPathVariables* myPath = piter->second;
const IPvXAddress& myPathId = myPath->remoteAddress;
if (myPath->outstandingBytes == 0) {
// T.D. 07.01.2010: Only stop T3 timer when there is nothing more to send on this path!
if(qCounter.roomTransQ.find(myPath->remoteAddress)->second == 0) {
// Stop T3 timer, if there are no more outstanding bytes.
stopTimer(myPath->T3_RtxTimer);
}
}
else if (myPath->newCumAck) {
stopTimer(myPath->T3_RtxTimer);
startTimer(myPath->T3_RtxTimer, myPath->pathRto);
}
else {
/* Also restart T3 timer, when lowest TSN is rtx'ed */
if(myPath->lowestTSNRetransmitted == true) {
sctpEV3 << "Lowest TSN retransmitted => restart of T3 timer for path "
<< myPathId << endl;
stopTimer(myPath->T3_RtxTimer);
startTimer(myPath->T3_RtxTimer, myPath->pathRto);
}
}
// ====== Clear error counter if TSNs on path have been acked =========
if (myPath->newlyAckedBytes > 0) {
pmClearPathCounter(myPath);
}
}
return SCTP_E_IGNORE;
}
| bool SCTPAssociation::processSCTPMessage | ( | SCTPMessage * | sctpmsg, |
| const IPvXAddress & | srcAddr, | ||
| const IPvXAddress & | destAddr | ||
| ) |
Process incoming SCTP segment. Normally returns true. A return value of false means that the connection structure must be deleted by the caller (SCTP).
Referenced by SCTP::handleMessage().
{
printConnBrief();
localAddr = msgDestAddr;
localPort = sctpmsg->getDestPort();
remoteAddr = msgSrcAddr;
remotePort = sctpmsg->getSrcPort();
return process_RCV_Message(sctpmsg, msgSrcAddr, msgDestAddr);
}
| bool SCTPAssociation::processTimer | ( | cMessage * | msg | ) |
Referenced by SCTP::handleMessage().
{
SCTPPathVariables* path = NULL;
sctpEV3 << msg->getName() << " timer expired at "<<simulation.getSimTime()<<"\n";
SCTPPathInfo* pinfo = check_and_cast<SCTPPathInfo*>(msg->getControlInfo());
IPvXAddress addr = pinfo->getRemoteAddress();
if (addr != IPvXAddress("0.0.0.0"))
path = getPath(addr);
// first do actions
SCTPEventCode event;
event = SCTP_E_IGNORE;
if (msg==T1_InitTimer)
{
process_TIMEOUT_INIT_REXMIT(event);
}
else if (msg==SackTimer)
{
sctpEV3<<simulation.getSimTime()<<" delayed Sack: cTsnAck="<<state->cTsnAck<<" highestTsnReceived="<<state->highestTsnReceived<<" lastTsnReceived="<<state->lastTsnReceived<<" ackState="<<state->ackState<<" numGaps="<<state->numGaps<<"\n";
sendSack();
}
else if (msg==T2_ShutdownTimer)
{
stopTimer(T2_ShutdownTimer);
process_TIMEOUT_SHUTDOWN(event);
}
else if (msg==T5_ShutdownGuardTimer)
{
stopTimer(T5_ShutdownGuardTimer);
delete state->shutdownChunk;
sendIndicationToApp(SCTP_I_CONN_LOST);
sendAbort();
sctpMain->removeAssociation(this);
}
else if (path!=NULL && msg==path->HeartbeatIntervalTimer)
{
process_TIMEOUT_HEARTBEAT_INTERVAL(path, path->forceHb);
}
else if (path!=NULL && msg==path->HeartbeatTimer)
{
process_TIMEOUT_HEARTBEAT(path);
}
else if (path!=NULL && msg==path->T3_RtxTimer)
{
process_TIMEOUT_RTX(path);
}
else if (path!=NULL && msg==path->CwndTimer)
{
(this->*ccFunctions.ccUpdateAfterCwndTimeout)(path);
}
else if (strcmp(msg->getName(), "StartTesting")==0)
{
if (sctpMain->testing == false)
{
sctpMain->testing = true;
sctpEV3<<"set testing to true\n";
}
}
else
{
sctpAlgorithm->processTimer(msg, event);
}
// then state transitions
return performStateTransition(event);
}
| void SCTPAssociation::pushUlp | ( | ) | [protected] |
Referenced by process_RECEIVE_REQUEST().
{
int32 count = 0;
for (unsigned int i = 0; i < inboundStreams; i++) { //12.06.08
putInDeliveryQ(i);
}
if (state->pushMessagesLeft <= 0) {
state->pushMessagesLeft = state->messagesToPush;
}
bool restrict = false;
if (state->pushMessagesLeft > 0) {
restrict = true;
}
sctpEV3 << simTime() << " Calling pushUlp(" << state->queuedReceivedBytes
<< " bytes queued) ..." << endl;
uint32 i = state->nextRSid;
do {
SCTPReceiveStreamMap::iterator iter = receiveStreams.find(i);
SCTPReceiveStream* rStream = iter->second;
sctpEV3 << "Size of stream " << iter->first << ": "
<< rStream->getDeliveryQ()->getQueueSize() << endl;
while ( (!rStream->getDeliveryQ()->payloadQueue.empty()) &&
(!restrict || (restrict && state->pushMessagesLeft>0)) ) {
SCTPDataVariables* chunk = rStream->getDeliveryQ()->extractMessage();
qCounter.roomSumRcvStreams -= ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
if (state->pushMessagesLeft >0)
state->pushMessagesLeft--;
state->queuedReceivedBytes-=chunk->len/8;
if (state->swsAvoidanceInvoked) {
if ((int32)(state->localRwnd - state->queuedReceivedBytes) >= (int32)(state->swsLimit) &&
(int32)(state->localRwnd - state->queuedReceivedBytes) <= (int32)(state->swsLimit+state->assocPmtu)) {
/* only if the window has opened up more than one MTU we will send a SACK */
state->swsAvoidanceInvoked = false;
sctpEV3<<"pushUlp: Window opens up to "<<(int32)state->localRwnd-state->queuedReceivedBytes<<" bytes: sending a SACK. SWS Avoidance INACTIVE\n";
sendSack();
}
}
else if ((int32)(state->swsLimit) == 0) {
sendSack();
}
sctpEV3 << "Push TSN " << chunk->tsn
<< ": sid=" << chunk->sid << " ssn=" << chunk->ssn << endl;
cPacket* msg= (cPacket *)chunk->userData;
msg->setKind(SCTP_I_DATA);
SCTPRcvCommand *cmd = new SCTPRcvCommand("push");
cmd->setAssocId(assocId);
cmd->setGate(appGateIndex);
cmd->setSid(chunk->sid);
cmd->setSsn(chunk->ssn);
cmd->setSendUnordered(!chunk->ordered);
cmd->setLocalAddr(localAddr);
cmd->setRemoteAddr(remoteAddr);
cmd->setPpid(chunk->ppid);
cmd->setTsn(chunk->tsn);
cmd->setCumTsn(state->lastTsnAck);
msg->setControlInfo(cmd);
state->numMsgsReq[count]--;
delete chunk;
sendToApp(msg);
}
i = (i + 1) % inboundStreams;
count++;
} while (i != state->nextRSid);
state->nextRSid = (state->nextRSid + 1) % inboundStreams;
if ( (state->queuedReceivedBytes == 0) && (fsm->getState() == SCTP_S_SHUTDOWN_ACK_SENT)) {
sctpEV3 << "SCTP_E_CLOSE" << endl;
performStateTransition(SCTP_E_CLOSE);
}
}
| void SCTPAssociation::putInDeliveryQ | ( | uint16 | sid | ) | [protected] |
Referenced by processDataArrived(), and pushUlp().
{
SCTPReceiveStreamMap::iterator iter=receiveStreams.find(sid);
SCTPReceiveStream* rStream = iter->second;
sctpEV3 << "putInDeliveryQ: SSN=" << rStream->getExpectedStreamSeqNum()
<< " SID=" << sid
<< " QueueSize="<< rStream->getOrderedQ()->getQueueSize() << endl;
while (rStream->getOrderedQ()->getQueueSize()>0)
{
/* dequeue first from reassembly Q */
SCTPDataVariables* chunk =
rStream->getOrderedQ()-> dequeueChunkBySSN(rStream->getExpectedStreamSeqNum());
if (chunk) {
sctpEV3 << "putInDeliveryQ::chunk " <<chunk->tsn
<<", sid " << chunk->sid <<" and ssn " << chunk->ssn
<<" dequeued from ordered queue. queuedReceivedBytes="
<< state->queuedReceivedBytes << " will be reduced by "
<< chunk->len/8 << endl;
state->queuedReceivedBytes-=chunk->len/8;
qCounter.roomSumRcvStreams -= ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
if (rStream->getDeliveryQ()->checkAndInsertChunk(chunk->tsn, chunk)) {
state->queuedReceivedBytes += chunk->len/8;
sctpEV3 << "data put in deliveryQ; queuedBytes now "
<< state->queuedReceivedBytes << endl;
qCounter.roomSumRcvStreams += ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
int32 seqnum=rStream->getExpectedStreamSeqNum();
rStream->setExpectedStreamSeqNum(++seqnum);
if (rStream->getExpectedStreamSeqNum() > 65535) {
rStream->setExpectedStreamSeqNum(0);
}
sendDataArrivedNotification(sid);
}
}
else {
break;
}
}
}
| void SCTPAssociation::putInTransmissionQ | ( | uint32 | tsn, |
| SCTPDataVariables * | chunk | ||
| ) | [protected] |
| void SCTPAssociation::recordCwndUpdate | ( | SCTPPathVariables * | path | ) | [private] |
Referenced by cwndUpdateAfterCwndTimeout(), cwndUpdateAfterRtxTimeout(), cwndUpdateAfterSack(), cwndUpdateBytesAcked(), cwndUpdateMaxBurst(), initCCParameters(), processHeartbeatAckArrived(), and removePath().
{
path->statisticsPathSSthresh->record(path->ssthresh);
path->statisticsPathCwnd->record(path->cwnd);
}
| void SCTPAssociation::recordInPathVectors | ( | SCTPMessage * | pMsg, |
| const IPvXAddress & | rDest | ||
| ) | [protected] |
Referenced by sendToIP().
{
uint32 n_chunks = pMsg->getChunksArraySize();
if (n_chunks == 0)
return;
SCTPPathVariables* p_path = getPath(rDest);
for (uint32 i = 0 ; i < n_chunks ; i++) {
const SCTPChunk* p_chunk = check_and_cast<const SCTPChunk *>(pMsg->getChunks(i));
if (p_chunk->getChunkType() == DATA) {
const SCTPDataChunk* p_data_chunk = check_and_cast<const SCTPDataChunk *>(p_chunk);
p_path->pathTSN->record(p_data_chunk->getTsn());
} else if (p_chunk->getChunkType() == HEARTBEAT) {
p_path->numberOfHeartbeatsSent++;
p_path->pathHb->record(p_path->numberOfHeartbeatsSent);
} else if (p_chunk->getChunkType() == HEARTBEAT_ACK) {
p_path->numberOfHeartbeatAcksSent++;
p_path->pathHbAck->record(p_path->numberOfHeartbeatAcksSent);
}
}
}
| void SCTPAssociation::removeFromGapList | ( | const uint32 | removedTsn | ) | [protected] |
Referenced by makeRoomForTsn().
{
int32 gapsize, numgaps;
numgaps = state->numGaps;
sctpEV3<<"remove TSN "<<removedTsn<<" from GapList. "<<numgaps<<" gaps present, cumTsnAck="<<state->cTsnAck<<"\n";
for (int32 j=0; j<numgaps; j++)
sctpEV3<<state->gapStartList[j]<<" - "<<state->gapStopList[j]<<"\n";
for (int32 i=numgaps-1; i>=0; i--)
{
sctpEV3<<"gapStartList["<<i<<"]="<<state->gapStartList[i]<<", state->gapStopList["<<i<<"]="<<state->gapStopList[i]<<"\n";
if (tsnBetween(state->gapStartList[i], removedTsn, state->gapStopList[i]))
{
gapsize = (int32)(state->gapStopList[i] - state->gapStartList[i]+1);
if (gapsize>1)
{
if (state->gapStopList[i]==removedTsn)
{
state->gapStopList[i]--;
}
else if (state->gapStartList[i]==removedTsn)
{
state->gapStartList[i]++;
}
else //gap is split in two
{
for (int32 j=numgaps-1; j>=i; j--)
{
state->gapStopList[j+1] = state->gapStopList[j];
state->gapStartList[j+1] = state->gapStartList[j];
}
state->gapStopList[i] = removedTsn-1;
state->gapStartList[i+1] = removedTsn+1;
state->numGaps = min(state->numGaps + 1, MAX_GAP_COUNT); // T.D. 18.12.09: Enforce upper limit!
}
}
else
{
for (int32 j=i; j<=numgaps-1; j++)
{
state->gapStopList[j] = state->gapStopList[j+1];
state->gapStartList[j] = state->gapStartList[j+1];
}
state->gapStartList[numgaps-1]=0;
state->gapStopList[numgaps-1]=0;
state->numGaps--;
if (state->numGaps == 0)
{
if (removedTsn == state->lastTsnAck+1)
{
state->lastTsnAck = removedTsn;
}
}
}
}
}
if (state->numGaps>0)
state->highestTsnReceived = state->gapStopList[state->numGaps-1];
else
state->highestTsnReceived = state->cTsnAck;
}
| void SCTPAssociation::removePath | ( | ) |
Referenced by SCTP::removeAssociation().
{
SCTPPathMap::iterator pathIterator;
while((pathIterator = sctpPathMap.begin()) != sctpPathMap.end())
{
SCTPPathVariables* path = pathIterator->second;
sctpEV3 << getFullPath() << " remove path " << path->remoteAddress << endl;
stopTimer(path->HeartbeatTimer);
delete path->HeartbeatTimer;
stopTimer(path->HeartbeatIntervalTimer);
sctpEV3 << "delete timer " << path->HeartbeatIntervalTimer->getName() << endl;
delete path->HeartbeatIntervalTimer;
stopTimer(path->T3_RtxTimer);
delete path->T3_RtxTimer;
stopTimer(path->CwndTimer);
delete path->CwndTimer;
delete path;
sctpPathMap.erase(pathIterator);
}
}
| void SCTPAssociation::removePath | ( | const IPvXAddress & | addr | ) |
{
SCTPPathMap::iterator pathIterator = sctpPathMap.find(addr);
if (pathIterator != sctpPathMap.end())
{
SCTPPathVariables* path = pathIterator->second;
path->cwnd = 0;
path->ssthresh = 0;
recordCwndUpdate(path);
stopTimer(path->HeartbeatTimer);
delete path->HeartbeatTimer;
stopTimer(path->HeartbeatIntervalTimer);
delete path->HeartbeatIntervalTimer;
stopTimer(path->T3_RtxTimer);
delete path->T3_RtxTimer;
stopTimer(path->CwndTimer);
delete path->CwndTimer;
sctpPathMap.erase(pathIterator);
delete path;
}
}
| void SCTPAssociation::retransmitCookieEcho | ( | ) | [protected] |
Referenced by process_RCV_Message(), and process_TIMEOUT_INIT_REXMIT().
{
SCTPMessage* sctpmsg = new SCTPMessage();
sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
SCTPCookieEchoChunk* cookieEchoChunk=check_and_cast<SCTPCookieEchoChunk*>(state->cookieChunk->dup());
if (cookieEchoChunk->getCookieArraySize()==0)
{
cookieEchoChunk->setStateCookie(state->cookieChunk->getStateCookie()->dup());
}
sctpmsg->addChunk(cookieEchoChunk);
sctpEV3<<"retransmitCookieEcho localAddr="<<localAddr<<" remoteAddr"<<remoteAddr<<"\n";
sendToIP(sctpmsg);
}
| void SCTPAssociation::retransmitInit | ( | ) | [protected] |
Retransmitting chunks
Referenced by process_RCV_Message(), and process_TIMEOUT_INIT_REXMIT().
{
SCTPMessage *sctpmsg = new SCTPMessage();
sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
SCTPInitChunk *sctpinit;// = new SCTPInitChunk("INIT");
sctpEV3<<"Retransmit InitChunk="<<&sctpinit<<"\n";
sctpinit=check_and_cast<SCTPInitChunk *>(state->initChunk->dup());
sctpinit->setChunkType(INIT);
sctpmsg->addChunk(sctpinit);
sendToIP(sctpmsg);
}
| void SCTPAssociation::retransmitShutdown | ( | ) | [protected] |
Referenced by process_TIMEOUT_SHUTDOWN().
{
SCTPMessage *sctpmsg = new SCTPMessage();
sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
SCTPShutdownChunk* shutdownChunk;
shutdownChunk=check_and_cast<SCTPShutdownChunk*>(state->shutdownChunk->dup());
sctpmsg->addChunk(shutdownChunk);
sctpEV3<<"retransmitShutdown localAddr="<<localAddr<<" remoteAddr"<<remoteAddr<<"\n";
sendToIP(sctpmsg);
}
| void SCTPAssociation::retransmitShutdownAck | ( | ) | [protected] |
Referenced by process_TIMEOUT_SHUTDOWN().
{
SCTPMessage *sctpmsg = new SCTPMessage();
sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
SCTPShutdownAckChunk* shutdownAckChunk;
shutdownAckChunk=check_and_cast<SCTPShutdownAckChunk*>(state->shutdownAckChunk->dup());
sctpmsg->addChunk(shutdownAckChunk);
sctpEV3<<"retransmitShutdownAck localAddr="<<localAddr<<" remoteAddr"<<remoteAddr<<"\n";
sendToIP(sctpmsg);
}
| void SCTPAssociation::scheduleSack | ( | ) | [protected] |
Referenced by process_RCV_Message().
{
/* increase SACK counter, we received another data PACKET */
if (state->firstChunkReceived)
state->ackState++;
else
{
state->ackState = sackFrequency;
state->firstChunkReceived = true;
}
sctpEV3<<"scheduleSack() : ackState is now: "<<state->ackState<<"\n";
if (state->ackState <= sackFrequency - 1)
{
/* start a SACK timer if none is running, to expire 200 ms (or parameter) from now */
if (!SackTimer->isScheduled())
{
startTimer(SackTimer, sackPeriod);
}
/* else: leave timer running, and do nothing... */ else {
/* is this possible at all ? Check this... */
sctpEV3<<"SACK timer running, but scheduleSack() called\n";
}
}
}
| void SCTPAssociation::scheduleTimeout | ( | cMessage * | msg, |
| const simtime_t & | timeout | ||
| ) | [inline, protected] |
Utility: start a timer
Referenced by SCTPAssociation(), and startTimer().
{
sctpMain->scheduleAt(simulation.getSimTime() + timeout, msg);
}
| void SCTPAssociation::sendAbort | ( | ) | [protected] |
Referenced by process_ABORT(), process_RCV_Message(), process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_INIT_REXMIT(), process_TIMEOUT_RTX(), process_TIMEOUT_SHUTDOWN(), processCookieEchoArrived(), processInitArrived(), processTimer(), and updateCounters().
{
SCTPMessage *msg = new SCTPMessage();
msg->setBitLength(SCTP_COMMON_HEADER*8);
sctpEV3<<"SCTPAssociationUtil:sendABORT localPort="<<localPort<<" remotePort="<<remotePort<<"\n";
msg->setSrcPort(localPort);
msg->setDestPort(remotePort);
SCTPAbortChunk* abortChunk = new SCTPAbortChunk("ABORT");
abortChunk->setChunkType(ABORT);
abortChunk->setT_Bit(0);
abortChunk->setBitLength(SCTP_ABORT_CHUNK_LENGTH*8);
msg->addChunk(abortChunk);
sendToIP(msg, remoteAddr);
}
| void SCTPAssociation::sendCookieAck | ( | const IPvXAddress & | dest | ) | [protected] |
Referenced by processCookieEchoArrived().
{
SCTPMessage *sctpcookieack = new SCTPMessage();
sctpcookieack->setBitLength(SCTP_COMMON_HEADER*8);
sctpEV3<<"SCTPAssociationUtil:sendCookieACK\n";
sctpcookieack->setSrcPort(localPort);
sctpcookieack->setDestPort(remotePort);
SCTPCookieAckChunk* cookieAckChunk=new SCTPCookieAckChunk("COOKIE_ACK");
cookieAckChunk->setChunkType(COOKIE_ACK);
cookieAckChunk->setBitLength(SCTP_COOKIE_ACK_LENGTH*8);
sctpcookieack->addChunk(cookieAckChunk);
sendToIP(sctpcookieack, dest);
}
| void SCTPAssociation::sendCookieEcho | ( | SCTPInitAckChunk * | initackchunk | ) | [protected] |
Referenced by processInitAckArrived().
{
SCTPMessage *sctpcookieecho = new SCTPMessage();
sctpcookieecho->setBitLength(SCTP_COMMON_HEADER*8);
sctpEV3<<"SCTPAssociationUtil:sendCookieEcho\n";
sctpcookieecho->setSrcPort(localPort);
sctpcookieecho->setDestPort(remotePort);
SCTPCookieEchoChunk* cookieEchoChunk=new SCTPCookieEchoChunk("COOKIE_ECHO");
cookieEchoChunk->setChunkType(COOKIE_ECHO);
int32 len = initAckChunk->getCookieArraySize();
cookieEchoChunk->setCookieArraySize(len);
if (len>0)
{
for (int32 i=0; i<len; i++)
cookieEchoChunk->setCookie(i, initAckChunk->getCookie(i));
cookieEchoChunk->setBitLength((SCTP_COOKIE_ACK_LENGTH+len)*8);
}
else
{
SCTPCookie* cookie = check_and_cast <SCTPCookie*> (initAckChunk->getStateCookie());
cookieEchoChunk->setStateCookie(cookie);
cookieEchoChunk->setBitLength(SCTP_COOKIE_ACK_LENGTH*8 + cookie->getBitLength());
}
uint32 unknownLen = initAckChunk->getUnrecognizedParametersArraySize();
if (unknownLen>0)
{
sctpEV3<<"Found unrecognized Parameters in INIT-ACK chunk with a length of "<<unknownLen<<" bytes.\n";
cookieEchoChunk->setUnrecognizedParametersArraySize(unknownLen);
for (uint32 i=0; i<unknownLen; i++)
cookieEchoChunk->setUnrecognizedParameters(i,initAckChunk->getUnrecognizedParameters(i));
}
else
cookieEchoChunk->setUnrecognizedParametersArraySize(0);
state->cookieChunk=check_and_cast<SCTPCookieEchoChunk*>(cookieEchoChunk->dup());
if (len==0)
{
state->cookieChunk->setStateCookie(initAckChunk->getStateCookie()->dup());
}
sctpcookieecho->addChunk(cookieEchoChunk);
sendToIP(sctpcookieecho);
}
| void SCTPAssociation::sendDataArrivedNotification | ( | uint16 | sid | ) | [protected] |
Referenced by processDataArrived(), and putInDeliveryQ().
{
sctpEV3<<"SendDataArrivedNotification\n";
cPacket* cmsg = new cPacket("DataArrivedNotification");
cmsg->setKind(SCTP_I_DATA_NOTIFICATION);
SCTPCommand *cmd = new SCTPCommand("notification");
cmd->setAssocId(assocId);
cmd->setSid(sid);
cmd->setNumMsgs(1);
cmsg->setControlInfo(cmd);
sendToApp(cmsg);
}
| void SCTPAssociation::sendEstabIndicationToApp | ( | ) | [protected] |
Utility: sends SCTP_I_ESTABLISHED indication with SCTPConnectInfo to application
Referenced by stateEntered().
{
sctpEV3 << "sendEstabIndicationToApp: localPort="
<< localPort << " remotePort=" << remotePort << endl;
cPacket* msg = new cPacket(indicationName(SCTP_I_ESTABLISHED));
msg->setKind(SCTP_I_ESTABLISHED);
SCTPConnectInfo* establishIndication = new SCTPConnectInfo("CI");
establishIndication->setAssocId(assocId);
establishIndication->setLocalAddr(localAddr);
establishIndication->setRemoteAddr(remoteAddr);
establishIndication->setLocalPort(localPort);
establishIndication->setRemotePort(remotePort);
establishIndication->setRemoteAddresses(remoteAddressList);
establishIndication->setInboundStreams(inboundStreams);
establishIndication->setOutboundStreams(outboundStreams);
establishIndication->setNumMsgs(state->sendQueueLimit);
msg->setControlInfo(establishIndication);
sctpMain->send(msg, "to_appl", appGateIndex);
}
| void SCTPAssociation::sendHeartbeat | ( | const SCTPPathVariables * | path | ) | [protected] |
Referenced by pmStartPathManagement(), and process_TIMEOUT_HEARTBEAT_INTERVAL().
{
SCTPMessage* sctpHeartbeatbeat = new SCTPMessage();
sctpHeartbeatbeat->setBitLength(SCTP_COMMON_HEADER*8);
sctpHeartbeatbeat->setSrcPort(localPort);
sctpHeartbeatbeat->setDestPort(remotePort);
SCTPHeartbeatChunk* heartbeatChunk = new SCTPHeartbeatChunk("HEARTBEAT");
heartbeatChunk->setChunkType(HEARTBEAT);
heartbeatChunk->setRemoteAddr(path->remoteAddress);
heartbeatChunk->setTimeField(simTime());
heartbeatChunk->setBitLength((SCTP_HEARTBEAT_CHUNK_LENGTH+12)*8);
sctpHeartbeatbeat->addChunk(heartbeatChunk);
sctpEV3 << "sendHeartbeat: sendToIP to " << path->remoteAddress << endl;
sendToIP(sctpHeartbeatbeat, path->remoteAddress);
}
| void SCTPAssociation::sendHeartbeatAck | ( | const SCTPHeartbeatChunk * | heartbeatChunk, |
| const IPvXAddress & | src, | ||
| const IPvXAddress & | dest | ||
| ) | [protected] |
Referenced by process_RCV_Message().
{
SCTPMessage* sctpHeartbeatAck = new SCTPMessage();
sctpHeartbeatAck->setBitLength(SCTP_COMMON_HEADER*8);
sctpHeartbeatAck->setSrcPort(localPort);
sctpHeartbeatAck->setDestPort(remotePort);
SCTPHeartbeatAckChunk* heartbeatAckChunk=new SCTPHeartbeatAckChunk("HEARTBEAT_ACK");
heartbeatAckChunk->setChunkType(HEARTBEAT_ACK);
heartbeatAckChunk->setRemoteAddr(heartbeatChunk->getRemoteAddr());
heartbeatAckChunk->setTimeField(heartbeatChunk->getTimeField());
const int32 len = heartbeatChunk->getInfoArraySize();
if (len > 0){
heartbeatAckChunk->setInfoArraySize(len);
for (int32 i=0; i<len; i++)
heartbeatAckChunk->setInfo(i,heartbeatChunk->getInfo(i));
}
heartbeatAckChunk->setBitLength(heartbeatChunk->getBitLength());
sctpHeartbeatAck->addChunk(heartbeatAckChunk);
sctpEV3 << "sendHeartbeatAck: sendToIP from " << src << " to " << dest << endl;
sendToIP(sctpHeartbeatAck, dest);
}
| void SCTPAssociation::sendIndicationToApp | ( | const int32 | code, |
| const int32 | value = 0 |
||
| ) | [protected] |
Utility: sends status indication (SCTP_I_xxx) to application
Referenced by dequeueAckedChunks(), process_RCV_Message(), process_SEND(), process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_INIT_REXMIT(), process_TIMEOUT_RTX(), process_TIMEOUT_SHUTDOWN(), processTimer(), sendOnPath(), signalConnectionTimeout(), stateEntered(), and updateCounters().
{
sctpEV3<<"sendIndicationToApp: " << indicationName(code) << endl;
cPacket* msg = new cPacket(indicationName(code));
msg->setKind(code);
SCTPCommand* indication = new SCTPCommand(indicationName(code));
indication->setAssocId(assocId);
indication->setLocalAddr(localAddr);
indication->setRemoteAddr(remoteAddr);
if (code == SCTP_I_SENDQUEUE_ABATED) {
indication->setNumMsgs(value);
}
msg->setControlInfo(indication);
sctpMain->send(msg, "to_appl", appGateIndex);
}
| void SCTPAssociation::sendInit | ( | ) | [protected] |
Methods for creating and sending chunks
Referenced by process_ASSOCIATE().
{
//RoutingTableAccess routingTableAccess;
InterfaceTableAccess interfaceTableAccess;
AddressVector adv;
uint32 length = SCTP_INIT_CHUNK_LENGTH;
if (remoteAddr.isUnspecified() || remotePort==0)
opp_error("Error processing command ASSOCIATE: foreign socket unspecified");
if (localPort==0)
opp_error("Error processing command ASSOCIATE: local port unspecified");
state->setPrimaryPath(getPath(remoteAddr));
// create message consisting of INIT chunk
SCTPMessage *sctpmsg = new SCTPMessage();
sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
SCTPInitChunk *initChunk = new SCTPInitChunk("INIT");
initChunk->setChunkType(INIT);
initChunk->setInitTag((uint32)(fmod(intrand(INT32_MAX), 1.0+(double)(unsigned)0xffffffffUL)) & 0xffffffffUL);
peerVTag = initChunk->getInitTag();
sctpEV3<<"INIT from "<<localAddr<<":InitTag="<<peerVTag<<"\n";
initChunk->setA_rwnd(sctpMain->par("arwnd"));
state->localRwnd = (long)sctpMain->par("arwnd");
initChunk->setNoOutStreams(outboundStreams);
initChunk->setNoInStreams(inboundStreams);
initChunk->setInitTSN(1000);
state->nextTSN=initChunk->getInitTSN();
state->lastTSN = initChunk->getInitTSN() + state->numRequests - 1;
initTsn=initChunk->getInitTSN();
IInterfaceTable *ift = interfaceTableAccess.get();
sctpEV3<<"add local address\n";
if (localAddressList.front() == IPvXAddress("0.0.0.0"))
{
for (int32 i=0; i<ift->getNumInterfaces(); ++i)
{
if (ift->getInterface(i)->ipv4Data()!=NULL)
{
adv.push_back(ift->getInterface(i)->ipv4Data()->getIPAddress());
}
else if (ift->getInterface(i)->ipv6Data()!=NULL)
{
for (int32 j=0; j<ift->getInterface(i)->ipv6Data()->getNumAddresses(); j++)
{
sctpEV3<<"add address "<<ift->getInterface(i)->ipv6Data()->getAddress(j)<<"\n";
adv.push_back(ift->getInterface(i)->ipv6Data()->getAddress(j));
}
}
}
}
else
{
adv = localAddressList;
sctpEV3<<"gebundene Adresse "<<localAddr<<" wird hinzugefuegt\n";
}
uint32 addrNum=0;
bool friendly = false;
if (remoteAddr.isIPv6())
{
for (AddressVector::iterator i=adv.begin(); i!=adv.end(); ++i)
{
if (!friendly)
{
initChunk->setAddressesArraySize(addrNum+1);
initChunk->setAddresses(addrNum++,(*i));
length+=20;
}
sctpMain->addLocalAddress(this, (*i));
state->localAddresses.push_back((*i));
if (localAddr.isUnspecified())
localAddr=(*i);
}
}
else
{
uint32 rlevel = getLevel(remoteAddr);
sctpEV3<<"level of remote address="<<rlevel<<"\n";
for (AddressVector::iterator i=adv.begin(); i!=adv.end(); ++i)
{
sctpEV3<<"level of address "<<(*i)<<" = "<<getLevel((*i))<<"\n";
if (getLevel((*i))>=rlevel)
{
initChunk->setAddressesArraySize(addrNum+1);
initChunk->setAddresses(addrNum++,(*i));
length+=8;
sctpMain->addLocalAddress(this, (*i));
state->localAddresses.push_back((*i));
if (localAddr.get4().getInt()==0)
localAddr=(*i);
}
else if (rlevel==4 && getLevel((*i))==3 && friendly)
{
sctpMain->addLocalAddress(this, (*i));
state->localAddresses.push_back((*i));
if (localAddr.get4().getInt()==0)
localAddr=(*i);
}
}
}
sctpMain->printInfoConnMap();
initChunk->setBitLength(length*8);
sctpmsg->addChunk(initChunk);
// set path variables
if (remoteAddressList.size()>0)
{
for (AddressVector::iterator it=remoteAddressList.begin(); it!=remoteAddressList.end(); it++)
{
sctpEV3<<__LINE__<<" get new path for "<<(*it)<<"\n";
SCTPPathVariables* path = new SCTPPathVariables((*it), this);
sctpPathMap[(*it)] = path;
qCounter.roomTransQ[(*it)] = 0;
qCounter.bookedTransQ[(*it)] = 0;
qCounter.roomRetransQ[(*it)] = 0;
}
}
else
{
sctpEV3<<__LINE__<<" get new path for "<<remoteAddr<<"\n";
SCTPPathVariables* path = new SCTPPathVariables(remoteAddr, this);
sctpPathMap[remoteAddr] = path;
qCounter.roomTransQ[remoteAddr] = 0;
qCounter.bookedTransQ[remoteAddr] = 0;
qCounter.roomRetransQ[remoteAddr] = 0;
}
// send it
state->initChunk=check_and_cast<SCTPInitChunk *>(initChunk->dup());
state->initChunk->setName("StateInitChunk");
printSctpPathMap();
sctpEV3<<getFullPath()<<" sendInit: localVTag="<<localVTag<<" peerVTag="<<peerVTag<<"\n";
sendToIP(sctpmsg);
sctpMain->assocList.push_back(this);
}
| void SCTPAssociation::sendInitAck | ( | SCTPInitChunk * | initchunk | ) | [protected] |
Referenced by processInitArrived().
{
uint32 length = SCTP_INIT_CHUNK_LENGTH;
state->setPrimaryPath(getPath(remoteAddr));
// create segment
SCTPMessage *sctpinitack = new SCTPMessage();
sctpinitack->setBitLength(SCTP_COMMON_HEADER*8);
sctpinitack->setSrcPort(localPort);
sctpinitack->setDestPort(remotePort);
sctpEV3<<"sendInitAck at "<<localAddr<<". Provided InitTag="<<initChunk->getInitTag()<<"\n";
SCTPInitAckChunk *initAckChunk = new SCTPInitAckChunk("INIT_ACK");
initAckChunk->setChunkType(INIT_ACK);
SCTPCookie *cookie = new SCTPCookie("CookieUtil");
cookie->setCreationTime(simTime());
cookie->setLocalTieTagArraySize(32);
cookie->setPeerTieTagArraySize(32);
if (fsm->getState()==SCTP_S_CLOSED)
{
while (peerVTag==0)
{
peerVTag = (uint32)intrand(INT32_MAX);
}
initAckChunk->setInitTag(peerVTag);
initAckChunk->setInitTSN(2000);
state->nextTSN=initAckChunk->getInitTSN();
state->lastTSN = initAckChunk->getInitTSN() + state->numRequests - 1;
cookie->setLocalTag(localVTag);
cookie->setPeerTag(peerVTag);
for (int32 i=0; i<32; i++)
{
cookie->setLocalTieTag(i,0);
cookie->setPeerTieTag(i,0);
}
sctpinitack->setTag(localVTag);
sctpEV3<<"state=closed: localVTag="<<localVTag<<" peerVTag="<<peerVTag<<"\n";
}
else if (fsm->getState()==SCTP_S_COOKIE_WAIT || fsm->getState()==SCTP_S_COOKIE_ECHOED)
{
initAckChunk->setInitTag(peerVTag);
sctpEV3<<"different state:set InitTag in InitAck: "<<initAckChunk->getInitTag()<<"\n";
initAckChunk->setInitTSN(state->nextTSN);
initPeerTsn=initChunk->getInitTSN();
state->cTsnAck = initPeerTsn - 1;
cookie->setLocalTag(initChunk->getInitTag());
cookie->setPeerTag(peerVTag);
for (int32 i=0; i<32; i++)
{
cookie->setPeerTieTag(i,(uint8)(intrand(256)));
state->peerTieTag[i] = cookie->getPeerTieTag(i);
if (fsm->getState()==SCTP_S_COOKIE_ECHOED)
{
cookie->setLocalTieTag(i,(uint8)(intrand(256)));
state->localTieTag[i] = cookie->getLocalTieTag(i);
}
else
cookie->setLocalTieTag(i,0);
}
sctpinitack->setTag(initChunk->getInitTag());
sctpEV3<<"VTag in InitAck: "<<sctpinitack->getTag()<<"\n";
}
else
{
sctpEV3<<"other state\n";
uint32 tag=0;
while (tag==0)
{
tag = (uint32)(fmod(intrand(INT32_MAX), 1.0+(double)(unsigned)0xffffffffUL)) & 0xffffffffUL;
}
initAckChunk->setInitTag(tag);
initAckChunk->setInitTSN(state->nextTSN);
cookie->setLocalTag(localVTag);
cookie->setPeerTag(peerVTag);
for (int32 i=0; i<32; i++)
{
cookie->setPeerTieTag(i,state->peerTieTag[i]);
cookie->setLocalTieTag(i,state->localTieTag[i]);
}
sctpinitack->setTag(initChunk->getInitTag());
}
cookie->setBitLength(SCTP_COOKIE_LENGTH*8);
initAckChunk->setStateCookie(cookie);
initAckChunk->setCookieArraySize(0);
initAckChunk->setA_rwnd(sctpMain->par("arwnd"));
state->localRwnd = (long)sctpMain->par("arwnd");
initAckChunk->setNoOutStreams((unsigned int)min(outboundStreams,initChunk->getNoInStreams()));
initAckChunk->setNoInStreams((unsigned int)min(inboundStreams,initChunk->getNoOutStreams()));
initTsn=initAckChunk->getInitTSN();
uint32 addrNum=0;
bool friendly = false;
if (!friendly)
for (AddressVector::iterator k=state->localAddresses.begin(); k!=state->localAddresses.end(); ++k)
{
initAckChunk->setAddressesArraySize(addrNum+1);
initAckChunk->setAddresses(addrNum++,(*k));
length+=8;
}
uint32 unknownLen = initChunk->getUnrecognizedParametersArraySize();
if (unknownLen>0)
{
sctpEV3<<"Found unrecognized Parameters in INIT chunk with a length of "<<unknownLen<<" bytes.\n";
initAckChunk->setUnrecognizedParametersArraySize(unknownLen);
for (uint32 i=0; i<unknownLen; i++)
initAckChunk->setUnrecognizedParameters(i,initChunk->getUnrecognizedParameters(i));
length+=unknownLen;
}
else
initAckChunk->setUnrecognizedParametersArraySize(0);
initAckChunk->setBitLength((length+initAckChunk->getCookieArraySize())*8 + cookie->getBitLength());
inboundStreams = ((initChunk->getNoOutStreams()<initAckChunk->getNoInStreams())?initChunk->getNoOutStreams():initAckChunk->getNoInStreams());
outboundStreams = ((initChunk->getNoInStreams()<initAckChunk->getNoOutStreams())?initChunk->getNoInStreams():initAckChunk->getNoOutStreams());
(this->*ssFunctions.ssInitStreams)(inboundStreams, outboundStreams);
sctpinitack->addChunk(initAckChunk);
if (fsm->getState()==SCTP_S_CLOSED)
{
sendToIP(sctpinitack, state->initialPrimaryPath);
}
else
{
sendToIP(sctpinitack);
}
sctpMain->assocList.push_back(this);
printSctpPathMap();
}
| void SCTPAssociation::sendOnAllPaths | ( | SCTPPathVariables * | firstPath | ) |
Referenced by process_ABORT(), process_CLOSE(), process_RCV_Message(), process_TIMEOUT_RTX(), sendShutdownAck(), and stateEntered().
{
// ------ Send on provided path first ... -----------------------------
if(firstPath != NULL) {
sendOnPath(firstPath);
}
// ------ ... then, try sending on all other paths --------------------
for (SCTPPathMap::iterator iterator = sctpPathMap.begin(); iterator != sctpPathMap.end(); ++iterator) {
SCTPPathVariables* path = iterator->second;
if(path != firstPath) {
sendOnPath(path);
}
}
}
| void SCTPAssociation::sendOnPath | ( | SCTPPathVariables * | pathId, |
| const bool | firstPass = true |
||
| ) |
Utility: Send data from sendQueue.
Referenced by SCTPAlg::sendCommandInvoked(), and sendOnAllPaths().
{
// ====== Variables ======================================================
SCTPPathVariables* path = NULL; // Path to send next message to
SCTPMessage* sctpMsg = NULL;
SCTPSackChunk* sackChunk = NULL;
SCTPDataChunk* chunkPtr = NULL;
uint16 chunksAdded = 0;
uint16 dataChunksAdded = 0;
uint32 totalChunksSent = 0;
uint32 totalPacketsSent = 0;
uint32 packetBytes = 0;
uint32 outstandingBytes = 0;
uint32 tcount = 0; // Bytes in transmission queue on the selected path
uint32 scount = 0; // Bytes in send streams
int32 bytesToSend = 0;
bool headerCreated = false;
bool rtxActive = false;
bool sendOneMorePacket = false;
bool sendingAllowed = true;
bool authAdded = false;
bool sackAdded = false;
// ====== Obtain path ====================================================
sctpEV3 << endl << "##### sendAll(";
if(pathId) {
sctpEV3 << pathId->remoteAddress;
}
sctpEV3 << ") #####" << endl;
while(sendingAllowed)
{
headerCreated = false;
if (state->bytesToRetransmit > 0) {
// There are bytes in the transmissionQ. They have to be sent first.
path = choosePathForRetransmission();
assert(path != NULL);
rtxActive = true;
}
else {
if (pathId == NULL) { // No path given => use primary path.
path = state->getPrimaryPath();
}
else {
path = pathId;
}
}
outstandingBytes = path->outstandingBytes;
assert((int32)outstandingBytes >= 0);
CounterMap::iterator tq = qCounter.roomTransQ.find(path->remoteAddress);
tcount = tq->second;
scount = qCounter.roomSumSendStreams; // includes header and padding
sctpEV3 << "\nsendAll: on " << path->remoteAddress << ":"
<< " tcount=" << tcount
<< " scount=" << scount
<< " nextTSN=" << state->nextTSN << endl;
bool sackOnly;
bool sackWithData;
timeForSack(sackOnly, sackWithData);
if (tcount == 0 && scount == 0) {
// ====== No DATA chunks to send ===================================
sctpEV3 << "No DATA chunk available!" << endl;
if (!sackOnly) { // SACK?, no data to send
sctpEV3 << "No SACK to send either" << endl;
return;
}
else {
bytes.bytesToSend = 0;
}
}
else {
bytesAllowedToSend(path, firstPass);
}
bytesToSend = bytes.bytesToSend;
// As there is at least a SACK to be sent, a header can be created
if (state->sctpMsg) // ??? Robin: Ist das in Ordnung???
{
loadPacket(path, &sctpMsg, &chunksAdded, &dataChunksAdded, &packetBytes, &authAdded);
headerCreated = true;
}
else if (bytesToSend > 0 || bytes.chunk || bytes.packet || sackWithData || sackOnly) {
sctpMsg = new SCTPMessage("send");
//printf("%d Name=%s Pointer=%p\n", __LINE__, sctpMsg->getName(), sctpMsg);
sctpMsg->setByteLength(SCTP_COMMON_HEADER);
packetBytes = 0;
headerCreated = true;
}
if (sackWithData || sackOnly)
{
// SACK can be sent
assert(headerCreated==true);
sackChunk = createSack();
chunksAdded++;
totalChunksSent++;
// ------ Create AUTH chunk, if necessary --------------------------
// ------ Add SACK chunk -------------------------------------------
sctpMsg->addChunk(sackChunk);
sackAdded = true;
if (sackOnly) // ????? Robin: SACK mit FORWARD_TSN????
{
// send the packet and leave
//printf("%d Name=%s Pointer=%p, sctpMsg\n", __LINE__, sctpMsg->getName(), sctpMsg);
state->ackState = 0;
// Stop SACK timer if it is running...
stopTimer(SackTimer);
sctpAlgorithm->sackSent();
state->sackAllowed = false;
sendToIP(sctpMsg, state->lastDataSourceAddress);
if ((bytesToSend > 0) || (bytes.chunk) || (bytes.packet)) {
sctpMsg = new SCTPMessage("send");
sctpMsg->setByteLength(SCTP_COMMON_HEADER);
packetBytes = 0;
headerCreated = true;
sackAdded = false;
}
else
return;
}
}
// ####################################################################
// #### Data Transmission ####
// ####################################################################
bool packetFull = false;
while(!packetFull && headerCreated) {
assert(headerCreated == true);
sctpEV3 << "bytesToSend=" << bytesToSend
<< " bytes.chunk=" << bytes.chunk
<< " bytes.packet=" << bytes.packet << endl;
// ====== How many bytes may be transmitted in next packet? ========
int32 allowance = path->pmtu; // Default behaviour: send 1 path MTU
if ((bytesToSend > 0) || (bytes.chunk) || (bytes.packet)) {
// Allow 1 more MTU
}
else {
// No more sending allowed.
allowance = 0;
bytesToSend = 0;
}
if ((allowance > 0) || (bytes.chunk) || (bytes.packet)) {
bool firstTime = false; // Is DATA chunk send for the first time?
// ====== Retransmission ========================================
// T.D. 05.01.2010: If bytes.packet is true, one packet is allowed
// to be retransmitted!
SCTPDataVariables* datVar = getOutboundDataChunk(path,
path->pmtu - sctpMsg->getByteLength() - 20,
(bytes.packet == true) ? path->pmtu : allowance);
if (chunksAdded==1 && sackAdded && !sackOnly && datVar==NULL) {
sctpMsg->removeChunk();
delete sackChunk;
datVar = getOutboundDataChunk(path,
path->pmtu - sctpMsg->getByteLength() - 20,
(bytes.packet == true) ? path->pmtu : allowance);
}
if (datVar != NULL) {
assert(datVar->getNextDestinationPath() == path);
datVar->numberOfRetransmissions++;
if (chunkHasBeenAcked(datVar) == false) {
sctpEV3 << simTime() << ": Retransmission #" << datVar->numberOfRetransmissions
<< " of TSN " << datVar->tsn
<< " on path " << datVar->getNextDestination()
<< " (last was " << datVar->getLastDestination() << ")" << endl;
datVar->countsAsOutstanding = true;
datVar->hasBeenReneged = false;
increaseOutstandingBytes(datVar, path); // NOTE: path == datVar->getNextDestinationPath()
}
}
// ====== First Transmission ====================================
else if ( ((scount > 0) && (!state->nagleEnabled)) || // Data to send and Nagle off
((uint32)scount >= path->pmtu - 32 - 20) || // Data to fill at least one path MTU
((scount > 0) && (state->nagleEnabled) && (outstandingBytes == 0)) ) { // Data to send, Nagle on and no outstanding bytes
if(path == state->getPrimaryPath()) {
// ------ Dequeue data message ----------------------------
sctpEV3 << "sendAll: sctpMsg->length=" << sctpMsg->getByteLength()
<< " length datMsg=" << path->pmtu-sctpMsg->getByteLength() - 20 << endl;
SCTPDataMsg* datMsg = dequeueOutboundDataMsg(path->pmtu-sctpMsg->getByteLength() - 20,
allowance);
if (chunksAdded==1 && sackAdded && !sackOnly && datMsg==NULL)
{
sctpMsg->removeChunk();
delete sackChunk;
datMsg = dequeueOutboundDataMsg(path->pmtu-sctpMsg->getByteLength() - 20,
allowance);
}
// ------ Handle data message -----------------------------
if (datMsg) {
firstTime = true;
state->queuedMessages--;
if ((state->queueLimit > 0) &&
(state->queuedMessages < state->queueLimit) &&
(state->queueUpdate == false)) {
// Tell upper layer readiness to accept more data
sendIndicationToApp(SCTP_I_SEND_MSG);
state->queueUpdate = true;
}
datVar = makeDataVarFromDataMsg(datMsg, path);
delete datMsg;
sctpEV3 << "sendAll: chunk " << datVar << " dequeued from StreamQ "
<< datVar->sid << ": tsn=" << datVar->tsn
<< ", bytes now " << qCounter.roomSumSendStreams << "\n";
}
// ------ No data message has been dequeued ---------------
else {
// ------ Are there any chunks to send? ----------------
if (chunksAdded == 0) {
// No -> nothing more to do.
if (state->sctpMsg == sctpMsg)
{
state->sctpMsg = NULL;
state->packetBytes = 0;
}
packetFull = true; // chunksAdded==0, packetFull==true => leave inner while loop
}
else {
// Yes.
if (state->nagleEnabled && (outstandingBytes > 0) &&
nextChunkFitsIntoPacket(path->pmtu-sctpMsg->getByteLength() - 20) &&
(sctpMsg->getByteLength() < path->pmtu - 32 - 20) && (tcount == 0))
{
storePacket(path, sctpMsg, chunksAdded, dataChunksAdded, packetBytes, authAdded);
packetBytes = 0;
}
//chunksAdded = 0;
packetFull = true; // chunksAdded==0, packetFull==true => leave inner while loop
sctpEV3 << "sendAll: packetFull: msg length = " << sctpMsg->getBitLength() / 8 + 20 << "\n";
}
}
}
}
// ------ Handle DATA chunk -------------------------------------
if (datVar != NULL && !packetFull) {
// ------ Assign TSN -----------------------------------------
if (firstTime) {
assert(datVar->tsn == 0);
datVar->tsn = state->nextTSN;
sctpEV3 << "sendAll: set TSN=" << datVar->tsn
<< " sid=" << datVar->sid << ", ssn=" << datVar->ssn << "\n";
state->nextTSN++;
}
SCTP::AssocStatMap::iterator iterator = sctpMain->assocStatMap.find(assocId);
iterator->second.transmittedBytes += datVar->len / 8;
datVar->setLastDestination(path);
datVar->countsAsOutstanding = true;
datVar->hasBeenReneged = false;
datVar->sendTime = simTime(); //I.R. to send Fast RTX just once a RTT
// ------ First transmission of datVar -----------------------
if (datVar->numberOfTransmissions == 0) {
sctpEV3 << "sendAll: " << simTime() << " firstTime, TSN "
<< datVar->tsn << ": lastDestination set to "
<< datVar->getLastDestination() << endl;
if (!state->firstDataSent) {
state->firstDataSent = true;
}
sctpEV3 << "sendAll: insert in retransmissionQ tsn=" << datVar->tsn << "\n";
if(!retransmissionQ->checkAndInsertChunk(datVar->tsn, datVar)) {
opp_error("Cannot add datVar to retransmissionQ!");
// Falls es hier aufschlaegt, muss ueberlegt werden, ob es OK ist, dass datVars nicht eingefuegt werden koennen.
}
else {
sctpEV3 << "sendAll: size of retransmissionQ=" << retransmissionQ->getQueueSize() << "\n";
unackChunk(datVar);
increaseOutstandingBytes(datVar, path);
}
}
/* datVar is already in the retransmissionQ */
datVar->numberOfTransmissions++;
datVar->gapReports = 0;
datVar->hasBeenFastRetransmitted = false;
sctpEV3<<"sendAll(): adding new outbound data datVar to packet (tsn="<<datVar->tsn<<")...!!!\n";
chunkPtr = transformDataChunk(datVar);
/* update counters */
totalChunksSent++;
chunksAdded++;
dataChunksAdded++;
sctpMsg->addChunk(chunkPtr);
if(nextChunkFitsIntoPacket(path->pmtu - sctpMsg->getByteLength() - 20) == false) {
// ???? Robin: Ist diese Annahme so richtig?
packetFull = true;
}
state->peerRwnd -= datVar->booksize;
if ((bytes.chunk == false) && (bytes.packet == false)) {
bytesToSend -= datVar->booksize;
}
else if (bytes.chunk) {
bytes.chunk = false;
}
else if ((bytes.packet) && (packetFull)) {
bytes.packet = false;
}
if (bytesToSend <= 0) {
if ((!packetFull) && (qCounter.roomSumSendStreams > path->pmtu - 32 - 20 || tcount > 0)) {
sendOneMorePacket = true;
bytes.packet = true;
sctpEV3 << "sendAll: one more packet allowed\n";
}
else {
if (state->nagleEnabled && (outstandingBytes > 0) &&
nextChunkFitsIntoPacket(path->pmtu-sctpMsg->getByteLength() - 20) &&
(sctpMsg->getByteLength() < path->pmtu - 32 - 20) && (tcount == 0))
{
storePacket(path, sctpMsg, chunksAdded, dataChunksAdded, packetBytes, authAdded);
packetBytes = 0;
chunksAdded = 0;
packetFull = true; // chunksAdded==0, packetFull==true => leave inner while loop
}
else {
packetFull = true;
}
}
bytesToSend = 0;
}
else if ((qCounter.roomSumSendStreams == 0) && (tq->second==0)) {
packetFull = true;
sctpEV3 << "sendAll: no data in send and transQ: packet full\n";
}
sctpEV3 << "sendAll: bytesToSend after reduction: " << bytesToSend << "\n";
}
// ------ There is no DATA chunk, only control chunks possible --
else {
// ????? Robin: Kann dieser Fall wirklich eintreten?
if (chunksAdded == 0) { // Nothing to do -> return
packetFull = true; // chunksAdded==0, packetFull==true => leave inner while loop
}
else {
packetFull = true;
sctpEV3 << "sendAll: packetFull: msg length = " << sctpMsg->getBitLength() / 8 + 20 << "\n";
datVar = NULL;
}
}
// ====== Send packet ===========================================
if (packetFull) {
if(chunksAdded == 0) { // Nothing to send
delete sctpMsg;
sendingAllowed = false; // sendingAllowed==false => leave outer while loop
}
else {
sctpEV3 << "sendAll: " << simTime() << " packet full:"
<< " totalLength=" << sctpMsg->getBitLength() / 8 + 20
<< ", path=" << path->remoteAddress
<< " " << dataChunksAdded << " chunks added, outstandingBytes now "
<< path->outstandingBytes << "\n";
/* new chunks would exceed MTU, so we send old packet and build a new one */
/* this implies that at least one data chunk is send here */
if (dataChunksAdded > 0) {
if (!path->T3_RtxTimer->isScheduled()) {
// Start retransmission timer, if not scheduled before
startTimer(path->T3_RtxTimer, path->pathRto);
}
else {
sctpEV3 << "sendAll: RTX Timer already scheduled -> no need to schedule it\n";
}
}
if (sendOneMorePacket) {
sendOneMorePacket = false;
bytesToSend = 0;
bytes.packet = false;
}
sendToIP(sctpMsg, path->remoteAddress);
pmDataIsSentOn(path);
totalPacketsSent++;
// ------ Reset status ------------------------------------
if (state->sctpMsg == sctpMsg)
{
state->sctpMsg = NULL;
path->outstandingBytes += packetBytes;
packetBytes = 0;
}
headerCreated = false;
chunksAdded = 0;
dataChunksAdded = 0;
firstTime = false;
sctpEV3 << "sendAll: sending Packet to path " << path->remoteAddress
<< " scount=" << scount
<< " tcount=" << tcount
<< " bytesToSend=" << bytesToSend << endl;
}
}
sctpEV3 << "sendAll: still " << bytesToSend
<< " bytes to send, headerCreated=" << headerCreated << endl;
} // if (bytesToSend > 0 || bytes.chunk || bytes.packet)
else {
packetFull = true; // Leave inner while loop
delete sctpMsg; // T.D. 19.01.2010: Free unsent message
}
sctpEV3 << "packetFull=" << packetFull << endl;
} // while(!packetFull)
sctpEV3 << "bytesToSend=" << bytesToSend
<< " bytes.chunk=" << bytes.chunk
<< " bytes.packet=" << bytes.packet << endl;
if (!(bytesToSend > 0 || bytes.chunk || bytes.packet)) {
sendingAllowed = false;
}
} // while(sendingAllowed)
sctpEV3 << "sendAll: nothing more to send... BYE!\n";
}
| void SCTPAssociation::sendSack | ( | ) | [protected] |
Referenced by process_RCV_Message(), processTimer(), and pushUlp().
{
SCTPSackChunk* sackChunk;
sctpEV3 << "Sending SACK" << endl;
/* sack timer has expired, reset flag, and send SACK */
stopTimer(SackTimer);
state->ackState = 0;
sackChunk = createSack();
SCTPMessage* sctpmsg = new SCTPMessage();
sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
sctpmsg->addChunk(sackChunk);
// Return the SACK to the address where we last got a data chunk from
sendToIP(sctpmsg, state->lastDataSourceAddress);
}
| void SCTPAssociation::sendShutdown | ( | ) | [protected] |
Referenced by process_CLOSE(), process_RCV_Message(), and stateEntered().
{
SCTPMessage *msg = new SCTPMessage();
msg->setBitLength(SCTP_COMMON_HEADER*8);
sctpEV3<<"SCTPAssociationUtil:sendShutdown localPort="<<localPort<<" remotePort="<<remotePort<<"\n";
msg->setSrcPort(localPort);
msg->setDestPort(remotePort);
SCTPShutdownChunk* shutdownChunk = new SCTPShutdownChunk("SHUTDOWN");
shutdownChunk->setChunkType(SHUTDOWN);
//shutdownChunk->setCumTsnAck(state->lastTsnAck);
shutdownChunk->setCumTsnAck(state->cTsnAck);
shutdownChunk->setBitLength(SCTP_SHUTDOWN_CHUNK_LENGTH*8);
state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT;
state->initRetransCounter = 0;
stopTimer(T5_ShutdownGuardTimer);
startTimer(T5_ShutdownGuardTimer,SHUTDOWN_GUARD_TIMEOUT);
stopTimer(T2_ShutdownTimer);
startTimer(T2_ShutdownTimer,state->initRexmitTimeout);
state->shutdownChunk=check_and_cast<SCTPShutdownChunk*>(shutdownChunk->dup());
msg->addChunk(shutdownChunk);
sendToIP(msg, remoteAddr);
performStateTransition(SCTP_E_NO_MORE_OUTSTANDING);
}
| void SCTPAssociation::sendShutdownAck | ( | const IPvXAddress & | dest | ) | [protected] |
Referenced by performStateTransition(), process_CLOSE(), process_RCV_Message(), processAppCommand(), and stateEntered().
{
sendOnAllPaths(getPath(dest));
if (getOutstandingBytes() == 0) {
performStateTransition(SCTP_E_NO_MORE_OUTSTANDING);
SCTPMessage *sctpshutdownack = new SCTPMessage();
sctpshutdownack->setBitLength(SCTP_COMMON_HEADER*8);
sctpEV3 << "SCTPAssociationUtil:sendShutdownACK" << endl;
sctpshutdownack->setSrcPort(localPort);
sctpshutdownack->setDestPort(remotePort);
SCTPShutdownAckChunk* shutdownAckChunk=new SCTPShutdownAckChunk("SHUTDOWN_ACK");
shutdownAckChunk->setChunkType(SHUTDOWN_ACK);
shutdownAckChunk->setBitLength(SCTP_COOKIE_ACK_LENGTH*8);
sctpshutdownack->addChunk(shutdownAckChunk);
state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT;
state->initRetransCounter = 0;
stopTimer(T2_ShutdownTimer);
startTimer(T2_ShutdownTimer,state->initRexmitTimeout);
stopTimer(T5_ShutdownGuardTimer);
startTimer(T5_ShutdownGuardTimer,SHUTDOWN_GUARD_TIMEOUT);
state->shutdownAckChunk=check_and_cast<SCTPShutdownAckChunk*>(shutdownAckChunk->dup());
sendToIP(sctpshutdownack, dest);
}
}
| void SCTPAssociation::sendShutdownComplete | ( | ) | [protected] |
Referenced by process_RCV_Message().
{
SCTPMessage *sctpshutdowncomplete = new SCTPMessage();
sctpshutdowncomplete->setBitLength(SCTP_COMMON_HEADER*8);
sctpEV3<<"SCTPAssociationUtil:sendShutdownComplete\n";
sctpshutdowncomplete->setSrcPort(localPort);
sctpshutdowncomplete->setDestPort(remotePort);
SCTPShutdownCompleteChunk* shutdownCompleteChunk=new SCTPShutdownCompleteChunk("SHUTDOWN_COMPLETE");
shutdownCompleteChunk->setChunkType(SHUTDOWN_COMPLETE);
shutdownCompleteChunk->setTBit(0);
shutdownCompleteChunk->setBitLength(SCTP_SHUTDOWN_ACK_LENGTH*8);
sctpshutdowncomplete->addChunk(shutdownCompleteChunk);
sendToIP(sctpshutdowncomplete);
}
| void SCTPAssociation::sendToApp | ( | cPacket * | msg | ) | [protected] |
Utility: sends packet to application
Referenced by pathStatusIndication(), process_STATUS(), pushUlp(), and sendDataArrivedNotification().
{
sctpMain->send(msg, "to_appl", appGateIndex);
}
| void SCTPAssociation::sendToIP | ( | SCTPMessage * | sctpmsg, |
| const IPvXAddress & | dest, | ||
| const bool | qs = false |
||
| ) | [protected] |
Utility: adds control info to message and sends it to IP
Referenced by retransmitCookieEcho(), retransmitInit(), retransmitShutdown(), retransmitShutdownAck(), sendAbort(), sendCookieAck(), sendCookieEcho(), sendHeartbeat(), sendHeartbeatAck(), sendInit(), sendInitAck(), sendOnPath(), sendSack(), sendShutdown(), sendShutdownAck(), and sendShutdownComplete().
{
// Final touches on the segment before sending
sctpmsg->setSrcPort(localPort);
sctpmsg->setDestPort(remotePort);
sctpmsg->setChecksumOk(true);
const SCTPChunk* chunk = (const SCTPChunk*)(sctpmsg->peekFirstChunk());
if (chunk->getChunkType() == ABORT) {
const SCTPAbortChunk* abortChunk = check_and_cast<const SCTPAbortChunk *>(chunk);
if (abortChunk->getT_Bit() == 1) {
sctpmsg->setTag(peerVTag);
}
else {
sctpmsg->setTag(localVTag);
}
}
else if (sctpmsg->getTag() == 0) {
sctpmsg->setTag(localVTag);
}
if ((bool)sctpMain->par("udpEncapsEnabled")) {
sctpmsg->setKind(UDP_C_DATA);
UDPControlInfo* controlInfo = new UDPControlInfo();
controlInfo->setSrcPort(9899);
controlInfo->setDestAddr(remoteAddr.get4());
controlInfo->setDestPort(9899);
sctpmsg->setControlInfo(controlInfo);
}
else {
if (dest.isIPv6()) {
IPv6ControlInfo* controlInfo = new IPv6ControlInfo();
controlInfo->setProtocol(IP_PROT_SCTP);
controlInfo->setSrcAddr(IPv6Address());
controlInfo->setDestAddr(dest.get6());
sctpmsg->setControlInfo(controlInfo);
sctpMain->send(sctpmsg, "to_ipv6");
}
else {
IPControlInfo* controlInfo = new IPControlInfo();
controlInfo->setProtocol(IP_PROT_SCTP);
controlInfo->setSrcAddr(IPAddress("0.0.0.0"));
controlInfo->setDestAddr(dest.get4());
sctpmsg->setControlInfo(controlInfo);
sctpMain->send(sctpmsg, "to_ip");
}
recordInPathVectors(sctpmsg, dest);
}
sctpEV3 << "Sent to " << dest << endl;
}
| void SCTPAssociation::sendToIP | ( | SCTPMessage * | sctpmsg, |
| const bool | qs = false |
||
| ) | [inline, protected] |
{
sendToIP(sctpmsg, remoteAddr, qs);
}
| void SCTPAssociation::signalConnectionTimeout | ( | ) | [protected] |
Utility: signal to user that connection timed out
| static int16 SCTPAssociation::ssnGt | ( | const uint16 | ssn1, |
| const uint16 | ssn2 | ||
| ) | [inline, static, protected] |
Referenced by makeRoomForTsn().
{ return ((int16)(ssn1-ssn2)>0); }
| void SCTPAssociation::startTimer | ( | cMessage * | timer, |
| const simtime_t & | timeout | ||
| ) | [protected] |
Referenced by pmDataIsSentOn(), pmStartPathManagement(), process_ASSOCIATE(), process_RCV_Message(), process_TIMEOUT_HEARTBEAT_INTERVAL(), process_TIMEOUT_INIT_REXMIT(), process_TIMEOUT_SHUTDOWN(), processInitAckArrived(), processSackArrived(), scheduleSack(), sendOnPath(), sendShutdown(), sendShutdownAck(), and tsnWasReneged().
{
sctpEV3 << "startTimer " << timer->getName() << " with timeout "
<< timeout << " to expire at " << simTime() + timeout << endl;
scheduleTimeout(timer, timeout);
}
| void SCTPAssociation::stateEntered | ( | int32 | state | ) | [protected] |
Referenced by performStateTransition().
{
switch (status)
{
case SCTP_S_COOKIE_WAIT:
break;
case SCTP_S_ESTABLISHED:
{
sctpEV3 << "State ESTABLISHED entered" << endl;
stopTimer(T1_InitTimer);
if (state->initChunk) {
delete state->initChunk;
}
state->nagleEnabled = (bool)sctpMain->par("nagleEnabled");
state->enableHeartbeats = (bool)sctpMain->par("enableHeartbeats");
state->numGapReports = sctpMain->par("numGapReports");
state->maxBurst = (uint32)sctpMain->par("maxBurst");
state->header = 0;
state->swsLimit = (uint32)sctpMain->par("swsLimit");
state->fastRecoverySupported = (bool)sctpMain->par("fastRecoverySupported");
state->reactivatePrimaryPath = (bool)sctpMain->par("reactivatePrimaryPath");
sackPeriod = (double)sctpMain->par("sackPeriod");
sackFrequency = sctpMain->par("sackFrequency");
SCTP::AssocStat stat;
stat.assocId = assocId;
stat.start = simulation.getSimTime();
stat.stop = 0;
stat.rcvdBytes = 0;
stat.ackedBytes = 0;
stat.sentBytes = 0;
stat.transmittedBytes = 0;
stat.numFastRtx = 0;
stat.numT3Rtx = 0;
stat.numDups = 0;
stat.numPathFailures = 0;
stat.numForwardTsn = 0;
stat.lifeTime = 0;
stat.throughput = 0;
sctpMain->assocStatMap[stat.assocId] = stat;
ccModule = sctpMain->par("ccModule");
switch (ccModule)
{
case RFC4960:
{
ccFunctions.ccInitParams = &SCTPAssociation::initCCParameters;
ccFunctions.ccUpdateAfterSack = &SCTPAssociation::cwndUpdateAfterSack;
ccFunctions.ccUpdateAfterCwndTimeout = &SCTPAssociation::cwndUpdateAfterCwndTimeout;
ccFunctions.ccUpdateAfterRtxTimeout = &SCTPAssociation::cwndUpdateAfterRtxTimeout;
ccFunctions.ccUpdateMaxBurst = &SCTPAssociation::cwndUpdateMaxBurst;
ccFunctions.ccUpdateBytesAcked = &SCTPAssociation::cwndUpdateBytesAcked;
break;
}
}
pmStartPathManagement();
state->sendQueueLimit = (uint32)sctpMain->par("sendQueueLimit");
sendEstabIndicationToApp();
char str[128];
snprintf(str, sizeof(str), "Cumulated TSN Ack of Association %d", assocId);
cumTsnAck = new cOutVector(str);
snprintf(str, sizeof(str), "Number of Gap Blocks in Last SACK of Association %d", assocId);
numGapBlocks = new cOutVector(str);
snprintf(str, sizeof(str), "SendQueue of Association %d", assocId);
sendQueue = new cOutVector(str);
state->sendQueueLimit = (uint32)sctpMain->par("sendQueueLimit");
SCTP::VTagPair vtagPair;
vtagPair.peerVTag = peerVTag;
vtagPair.localVTag = localVTag;
vtagPair.localPort = localPort;
vtagPair.remotePort = remotePort;
sctpMain->sctpVTagMap[assocId] = vtagPair;
break;
}
case SCTP_S_CLOSED:
{
sendIndicationToApp(SCTP_I_CLOSED);
break;
}
case SCTP_S_SHUTDOWN_PENDING:
{
if (getOutstandingBytes()==0 && transmissionQ->getQueueSize()==0 && qCounter.roomSumSendStreams==0)
sendShutdown();
break;
}
case SCTP_S_SHUTDOWN_RECEIVED:
{
sctpEV3 << "Entered state SHUTDOWN_RECEIVED, osb=" << getOutstandingBytes()
<< ", transQ=" << transmissionQ->getQueueSize()
<< ", scount=" << qCounter.roomSumSendStreams << endl;
if (getOutstandingBytes()==0 && transmissionQ->getQueueSize()==0 && qCounter.roomSumSendStreams==0) {
sendShutdownAck(remoteAddr);
}
else {
sendOnAllPaths(state->getPrimaryPath());
}
break;
}
}
}
| const char * SCTPAssociation::stateName | ( | const int32 | state | ) | [static] |
Utility: returns name of SCTP_S_xxx constants
Referenced by performStateTransition(), printConnBrief(), process_RCV_Message(), process_STATUS(), process_TIMEOUT_INIT_REXMIT(), and processAppCommand().
{
#define CASE(x) case x: s=#x+7; break
const char* s = "unknown";
switch (state) {
CASE(SCTP_S_CLOSED);
CASE(SCTP_S_COOKIE_WAIT);
CASE(SCTP_S_COOKIE_ECHOED);
CASE(SCTP_S_ESTABLISHED);
CASE(SCTP_S_SHUTDOWN_PENDING);
CASE(SCTP_S_SHUTDOWN_SENT);
CASE(SCTP_S_SHUTDOWN_RECEIVED);
CASE(SCTP_S_SHUTDOWN_ACK_SENT);
}
return s;
#undef CASE
}
| void SCTPAssociation::stopTimer | ( | cMessage * | timer | ) |
Referenced by moveChunkToOtherPath(), pmDataIsSentOn(), pmStartPathManagement(), process_RCV_Message(), process_TIMEOUT_HEARTBEAT_INTERVAL(), processCookieAckArrived(), processHeartbeatAckArrived(), processInitAckArrived(), processSackArrived(), processTimer(), SCTP::removeAssociation(), removePath(), sendOnPath(), sendSack(), sendShutdown(), sendShutdownAck(), stateEntered(), and stopTimers().
{
ev << "stopTimer " << timer->getName() << endl;
if (timer->isScheduled()) {
cancelEvent(timer);
}
}
| void SCTPAssociation::stopTimers | ( | ) |
Referenced by process_RCV_Message().
{
for (SCTPPathMap::iterator j = sctpPathMap.begin(); j != sctpPathMap.end(); j++) {
stopTimer(j->second->HeartbeatTimer);
stopTimer(j->second->HeartbeatIntervalTimer);
}
}
| void SCTPAssociation::storePacket | ( | SCTPPathVariables * | pathVar, |
| SCTPMessage * | sctpMsg, | ||
| const uint16 | chunksAdded, | ||
| const uint16 | dataChunksAdded, | ||
| const uint32 | packetBytes, | ||
| const bool | authAdded | ||
| ) | [private] |
Referenced by sendOnPath().
{
for (uint16 i = 0; i < sctpMsg->getChunksArraySize(); i++) {
retransmissionQ->payloadQueue.find(((SCTPDataChunk*)sctpMsg->getChunks(i))->getTsn())->second->countsAsOutstanding = false;
}
state->sctpMsg = sctpMsg;
state->chunksAdded = chunksAdded;
state->dataChunksAdded = dataChunksAdded;
state->packetBytes = packetBytes;
sctpEV3 << "storePacket: path=" << pathVar->remoteAddress
<< " osb=" << pathVar->outstandingBytes << " -> "
<< pathVar->outstandingBytes - state->packetBytes << endl;
assert(pathVar->outstandingBytes >= state->packetBytes);
pathVar->outstandingBytes -= state->packetBytes;
qCounter.roomSumSendStreams += state->packetBytes + (dataChunksAdded * SCTP_DATA_CHUNK_LENGTH);
qCounter.bookedSumSendStreams += state->packetBytes;
}
| int32 SCTPAssociation::streamScheduler | ( | bool | peek | ) | [protected] |
Dealing with streams
Referenced by SCTPAssociation().
{
int32 sid, testsid;
sctpEV3<<"Stream Scheduler: RoundRobin\n";
sid = -1;
if ((state->ssLastDataChunkSizeSet == false || state->ssNextStream == false) &&
(sendStreams.find(state->lastStreamScheduled)->second->getUnorderedStreamQ()->length() > 0 ||
sendStreams.find(state->lastStreamScheduled)->second->getStreamQ()->length() > 0))
{
sid = state->lastStreamScheduled;
sctpEV3<<"Stream Scheduler: again sid " << sid << ".\n";
state->ssNextStream = true;
}
else
{
testsid = state->lastStreamScheduled;
do {
testsid = (testsid + 1) % outboundStreams;
if (sendStreams.find(testsid)->second->getUnorderedStreamQ()->length() > 0 ||
sendStreams.find(testsid)->second->getStreamQ()->length() > 0)
{
sid = testsid;
sctpEV3<<"Stream Scheduler: chose sid " << sid << ".\n";
if (!peek)
state->lastStreamScheduled = sid;
}
} while (sid == -1 && testsid != (int32) state->lastStreamScheduled);
}
sctpEV3<<"streamScheduler sid="<<sid<<" lastStream="<<state->lastStreamScheduled<<" outboundStreams="<<outboundStreams<<" next="<<state->ssNextStream<<"\n";
state->ssLastDataChunkSizeSet = false;
return sid;
}
| void SCTPAssociation::timeForSack | ( | bool & | sackOnly, |
| bool & | sackWithData | ||
| ) | [private] |
Referenced by sendOnPath().
{
sackOnly = sackWithData = false;
if (((state->numGaps > 0) || (state->dupList.size() > 0)) &&
(state->sackAllowed)) {
// Schedule sending of SACKs at once, when we have fragments to report
state->ackState = sackFrequency;
sackOnly = sackWithData = true; // SACK necessary, regardless of data available
}
if (state->ackState >= sackFrequency) {
sackOnly = sackWithData = true; // SACK necessary, regardless of data available
}
else if (SackTimer->isScheduled()) {
sackOnly = false;
sackWithData = true; // Only send SACK when data is present.
}
}
| SCTPDataChunk * SCTPAssociation::transformDataChunk | ( | SCTPDataVariables * | chunk | ) | [protected] |
Manipulating chunks
Referenced by sendOnPath().
{
SCTPDataChunk* dataChunk = new SCTPDataChunk("DATA");
SCTPSimpleMessage* msg = check_and_cast<SCTPSimpleMessage*>(chunk->userData->dup());
dataChunk->setChunkType(DATA);
dataChunk->setBBit(chunk->bbit);
dataChunk->setEBit(chunk->ebit);
if (chunk->ordered) {
dataChunk->setUBit(0);
}
else {
dataChunk->setUBit(1);
}
dataChunk->setTsn(chunk->tsn);
dataChunk->setSid(chunk->sid);
dataChunk->setSsn(chunk->ssn);
dataChunk->setPpid(chunk->ppid);
dataChunk->setEnqueuingTime(chunk->enqueuingTime);
dataChunk->setBitLength(SCTP_DATA_CHUNK_LENGTH*8);
msg->setBitLength(chunk->len);
dataChunk->encapsulate(msg);
return dataChunk;
}
| static int32 SCTPAssociation::tsnBetween | ( | const uint32 | tsn1, |
| const uint32 | midtsn, | ||
| const uint32 | tsn2 | ||
| ) | [inline, static, protected] |
Referenced by removeFromGapList(), tsnIsDuplicate(), and updateGapList().
{ return ((tsn2-tsn1)>=(midtsn-tsn1)); }
| static int32 SCTPAssociation::tsnGe | ( | const uint32 | tsn1, |
| const uint32 | tsn2 | ||
| ) | [inline, static, protected] |
Referenced by createSack(), and dequeueAckedChunks().
{ return ((int32)(tsn1-tsn2)>=0); }
| static int32 SCTPAssociation::tsnGt | ( | const uint32 | tsn1, |
| const uint32 | tsn2 | ||
| ) | [inline, static, protected] |
Referenced by createSack(), cwndUpdateAfterSack(), processDataArrived(), processSackArrived(), updateFastRecoveryStatus(), and updateGapList().
{ return ((int32)(tsn1-tsn2)>0); }
| bool SCTPAssociation::tsnIsDuplicate | ( | const uint32 | tsn | ) | const [protected] |
Methods dealing with the handling of TSNs
Referenced by processDataArrived().
{
for (std::list<uint32>::const_iterator iterator = state->dupList.begin();
iterator != state->dupList.end(); iterator++)
{
if ((*iterator) == tsn)
return true;
}
for (uint32 i=0; i < state->numGaps; i++) {
if (tsnBetween(state->gapStartList[i], tsn, state->gapStopList[i])) {
return true;
}
}
return false;
}
| static int32 SCTPAssociation::tsnLe | ( | const uint32 | tsn1, |
| const uint32 | tsn2 | ||
| ) | [inline, static, protected] |
Referenced by processDataArrived().
{ return ((int32)(tsn1-tsn2)<=0); }
| static int32 SCTPAssociation::tsnLt | ( | const uint32 | tsn1, |
| const uint32 | tsn2 | ||
| ) | [inline, static, protected] |
| void SCTPAssociation::tsnWasReneged | ( | SCTPDataVariables * | chunk, |
| const int | type | ||
| ) | [protected] |
Referenced by handleChunkReportedAsMissing(), and processSackArrived().
{
#ifdef PKT
if (transmissionQ->getQueueSize() > 0) {
for (SCTPQueue::PayloadQueue::iterator tq = transmissionQ->payloadQueue.begin();
tq != transmissionQ->payloadQueue.end(); tq++) {
if (tq->second->tsn > chunk->tsn) {
if (!transmissionQ->checkAndInsertChunk(chunk->tsn, chunk)) {
sctpEV3 << "tsnWasReneged: cannot add message/chunk (TSN="
<< chunk->tsn <<") to the transmissionQ" << endl;
}
else {
chunk->enqueuedInTransmissionQ = true;
chunk->setNextDestination(chunk->getLastDestinationPath());
CounterMap::iterator q = qCounter.roomTransQ.find(chunk->getNextDestination());
q->second+=ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
CounterMap::iterator qb=qCounter.bookedTransQ.find(chunk->getNextDestination());
qb->second+=chunk->booksize;
return;
}
}
}
}
#endif
sctpEV3 << "TSN " << chunk->tsn << " has been reneged (type "
<< type << ")" << endl;
unackChunk(chunk);
if (chunk->countsAsOutstanding) {
decreaseOutstandingBytes(chunk);
}
chunk->hasBeenReneged = true;
chunk->gapReports = 1;
if (!chunk->getLastDestinationPath()->T3_RtxTimer->isScheduled()) {
startTimer(chunk->getLastDestinationPath()->T3_RtxTimer,
chunk->getLastDestinationPath()->pathRto);
}
}
| void SCTPAssociation::unackChunk | ( | SCTPDataVariables * | chunk | ) | [inline, private] |
Referenced by sendOnPath(), and tsnWasReneged().
{
chunk->hasBeenAcked = false;
}
| int32 SCTPAssociation::updateCounters | ( | SCTPPathVariables * | path | ) | [protected] |
{
bool notifyUlp = false;
if (++state->errorCount >= (uint32)sctpMain->par("assocMaxRetrans"))
{
sctpEV3 << "Retransmission count during connection setup exceeds " << (int32)sctpMain->par("assocMaxRetrans") << ", giving up\n";
sendIndicationToApp(SCTP_I_CLOSED);
sendAbort();
sctpMain->removeAssociation(this);
return 0;
}
else if (++path->pathErrorCount >= (uint32)sctpMain->par("pathMaxRetrans"))
{
if (path->activePath)
{
/* tell the source */
notifyUlp = true;
}
path->activePath = false;
if (path == state->getPrimaryPath()) {
state->setPrimaryPath(getNextPath(path));
}
sctpEV3<<"process_TIMEOUT_RESET("<<(path->remoteAddress)<<") : PATH ERROR COUNTER EXCEEDED, path status is INACTIVE\n";
if (allPathsInactive())
{
sctpEV3<<"process_TIMEOUT_RESET : ALL PATHS INACTIVE --> closing ASSOC\n";
sendIndicationToApp(SCTP_I_CONN_LOST);
sendAbort();
sctpMain->removeAssociation(this);
return 0;
}
else if (notifyUlp)
{
/* notify the application */
pathStatusIndication(path, false);
}
sctpEV3<<"process_TIMEOUT_RESET("<<(path->remoteAddress)<<") : PATH ERROR COUNTER now "<<path->pathErrorCount<<"\n";
return 2;
}
return 1;
}
| void SCTPAssociation::updateFastRecoveryStatus | ( | const uint32 | lastTsnAck | ) | [protected] |
Referenced by processSackArrived().
{
for (SCTPPathMap::iterator iter = sctpPathMap.begin(); iter != sctpPathMap.end(); iter++) {
SCTPPathVariables* path = iter->second;
if (path->fastRecoveryActive) {
if ( (tsnGt(lastTsnAck, path->fastRecoveryExitPoint)) ||
(lastTsnAck == path->fastRecoveryExitPoint)
) {
path->fastRecoveryActive = false;
path->fastRecoveryExitPoint = 0;
sctpEV3 << simTime() << ":\tCC [cwndUpdateAfterSack] Leaving Fast Recovery on path "
<< path->remoteAddress
<< ", lastTsnAck=" << lastTsnAck << endl;
}
}
}
}
| bool SCTPAssociation::updateGapList | ( | const uint32 | tsn | ) | [protected] |
TSN either sits at the end of one gap, and thus changes gap boundaries, or it is in between two gaps, and becomes a new gap
Referenced by processDataArrived().
{
sctpEV3 << "Entering updateGapList (tsn=" << receivedTsn
<< " cTsnAck=" <<state->cTsnAck << " Number of Gaps="
<< state->numGaps << endl;
uint32 lo = state->cTsnAck + 1;
if ((int32)(state->localRwnd-state->queuedReceivedBytes) <= 0)
{
sctpEV3 << "Window full" << endl;
// Only check if cumTsnAck can be advanced
if (receivedTsn == lo) {
sctpEV3 << "Window full, but cumTsnAck can be advanced:" << lo << endl;
}
else
return false;
}
if (tsnGt(receivedTsn, state->highestTsnStored)) { // 17.06.08
state->highestTsnStored = receivedTsn;
}
for (uint32 i = 0; i<state->numGaps; i++) {
if (state->gapStartList[i] > 0) {
const uint32 hi = state->gapStartList[i] - 1;
if (tsnBetween(lo, receivedTsn, hi)) {
const uint32 gapsize = hi - lo + 1;
if (gapsize > 1) {
if (receivedTsn == hi) {
state->gapStartList[i] = receivedTsn;
state->newChunkReceived = true;
return true;
}
else if (receivedTsn == lo) {
if (receivedTsn == (state->cTsnAck + 1)) {
state->cTsnAck++;
state->newChunkReceived = true;
return true;
}
/* some gap must increase its upper bound */
state->gapStopList[i-1] = receivedTsn;
state->newChunkReceived = true;
return true;
}
else { /* a gap in between */
state->numGaps = min(state->numGaps + 1, MAX_GAP_COUNT); // T.D. 18.12.09: Enforce upper limit!
for (uint32 j = state->numGaps - 1; j > i; j--) { // T.D. 18.12.09: Fixed invalid start value.
state->gapStartList[j] = state->gapStartList[j-1];
state->gapStopList[j] = state->gapStopList[j-1];
}
state->gapStartList[i]=receivedTsn;
state->gapStopList[i]=receivedTsn;
state->newChunkReceived = true;
return true;
}
}
else { /* alright: gapsize is 1: our received tsn may close gap between fragments */
if (lo == state->cTsnAck + 1) {
state->cTsnAck = state->gapStopList[i];
if (i == state->numGaps-1) {
state->gapStartList[i] = 0;
state->gapStopList[i] = 0;
}
else {
for (uint32 j = i; j < state->numGaps - 1; j++) { // T.D. 18.12.09: Fixed invalid end value.
state->gapStartList[j] = state->gapStartList[j + 1];
state->gapStopList[j] = state->gapStopList[j + 1];
}
}
state->numGaps--;
state->newChunkReceived = true;
return true;
}
else {
state->gapStopList[i-1] = state->gapStopList[i];
if (i == state->numGaps-1) {
state->gapStartList[i] = 0;
state->gapStopList[i] = 0;
}
else {
for (uint32 j = i; j < state->numGaps - 1; j++) { // T.D. 18.12.09: Fixed invalid end value.
state->gapStartList[j] = state->gapStartList[j + 1];
state->gapStopList[j] = state->gapStopList[j + 1];
}
}
state->numGaps--;
state->newChunkReceived = true;
return true;
}
}
}
else { /* receivedTsn is not in the gap between these fragments... */
lo = state->gapStopList[i] + 1;
}
} /* end: for */
}/* end: for */
/* (NULL LIST) OR (End of Gap List passed) */
if (receivedTsn == lo) { // just increase ctsna, handle further update of ctsna later
if (receivedTsn == state->cTsnAck + 1) {
state->cTsnAck = receivedTsn;
state->newChunkReceived = true;
return true;
}
/* Update last fragment....increase stop_tsn by one */
state->gapStopList[state->numGaps-1]++;
state->newChunkReceived = true;
return true;
}
else { // A new fragment altogether, past the end of the list
if(state->numGaps + 1 <= MAX_GAP_COUNT) { // T.D. 18.12.09: Enforce upper limit!
state->gapStartList[state->numGaps] = receivedTsn;
state->gapStopList[state->numGaps] = receivedTsn;
state->numGaps++;
state->newChunkReceived = true;
}
return true;
}
return false;
}
friend class SCTP [friend] |
friend class SCTPPathVariables [friend] |
Referenced by addPath(), processInitAckArrived(), processInitArrived(), and sendInit().
cOutVector* SCTPAssociation::advRwnd [protected] |
Referenced by createSack(), SCTPAssociation(), and ~SCTPAssociation().
| int32 SCTPAssociation::assocId |
Referenced by SCTP::addForkedAssociation(), cloneAssociation(), dequeueAckedChunks(), handleChunkReportedAsMissing(), pathStatusIndication(), printConnBrief(), SCTP::printInfoConnMap(), process_ABORT(), process_CLOSE(), process_OPEN_PASSIVE(), process_SEND(), process_TIMEOUT_RTX(), processDataArrived(), processInitArrived(), pushUlp(), SCTP::removeAssociation(), SCTPAssociation(), SCTPPathVariables::SCTPPathVariables(), sendDataArrivedNotification(), sendEstabIndicationToApp(), sendIndicationToApp(), sendOnPath(), and stateEntered().
BytesToBeSent SCTPAssociation::bytes [protected] |
Referenced by bytesAllowedToSend(), SCTPAssociation(), and sendOnPath().
CCFunctions SCTPAssociation::ccFunctions [protected] |
Referenced by process_TIMEOUT_RTX(), processSackArrived(), processTimer(), and stateEntered().
uint16 SCTPAssociation::ccModule [protected] |
Referenced by stateEntered().
cOutVector* SCTPAssociation::cumTsnAck [protected] |
Referenced by SCTPAssociation(), stateEntered(), and ~SCTPAssociation().
cFSM* SCTPAssociation::fsm [protected] |
Referenced by cloneAssociation(), performStateTransition(), printConnBrief(), process_ABORT(), process_ASSOCIATE(), process_CLOSE(), process_OPEN_PASSIVE(), process_RCV_Message(), process_SEND(), process_STATUS(), process_TIMEOUT_INIT_REXMIT(), process_TIMEOUT_SHUTDOWN(), processAppCommand(), processCookieAckArrived(), processCookieEchoArrived(), processInitAckArrived(), processInitArrived(), pushUlp(), SCTPAssociation(), sendInitAck(), and ~SCTPAssociation().
uint32 SCTPAssociation::inboundStreams [protected] |
uint32 SCTPAssociation::initPeerTsn [protected] |
Referenced by processDataArrived(), processInitAckArrived(), processInitArrived(), and sendInitAck().
uint32 SCTPAssociation::initTsn [protected] |
Referenced by sendInit(), and sendInitAck().
Referenced by SCTP::addLocalAddress(), SCTP::addLocalAddressToAllRemoteAddresses(), cloneAssociation(), createSack(), printConnBrief(), process_RCV_Message(), process_SEND(), processInitArrived(), processSCTPMessage(), pushUlp(), retransmitCookieEcho(), retransmitShutdown(), retransmitShutdownAck(), sendEstabIndicationToApp(), sendIndicationToApp(), sendInit(), sendInitAck(), and SCTP::updateSockPair().
AddressVector SCTPAssociation::localAddressList [protected] |
Referenced by cloneAssociation(), process_ASSOCIATE(), process_OPEN_PASSIVE(), processInitArrived(), and sendInit().
| uint16 SCTPAssociation::localPort |
Referenced by SCTP::addLocalAddress(), SCTP::addLocalAddressToAllRemoteAddresses(), SCTP::addRemoteAddress(), cloneAssociation(), printConnBrief(), process_ASSOCIATE(), process_OPEN_PASSIVE(), processSCTPMessage(), SCTP::removeLocalAddressFromAllRemoteAddresses(), SCTP::removeRemoteAddressFromAllConnections(), SCTPAssociation(), sendAbort(), sendCookieAck(), sendCookieEcho(), sendEstabIndicationToApp(), sendHeartbeat(), sendHeartbeatAck(), sendInit(), sendInitAck(), sendShutdown(), sendShutdownAck(), sendShutdownComplete(), sendToIP(), stateEntered(), and SCTP::updateSockPair().
| uint32 SCTPAssociation::localVTag |
uint32 SCTPAssociation::numberOfRemoteAddresses [protected] |
Referenced by processInitAckArrived(), processInitArrived(), and SCTPAssociation().
cOutVector* SCTPAssociation::numGapBlocks [protected] |
Referenced by SCTPAssociation(), stateEntered(), and ~SCTPAssociation().
uint32 SCTPAssociation::outboundStreams [protected] |
| uint32 SCTPAssociation::peerVTag |
Referenced by process_RCV_Message(), processCookieEchoArrived(), SCTPAssociation(), sendInit(), sendInitAck(), sendToIP(), and stateEntered().
QueueCounter SCTPAssociation::qCounter [protected] |
Referenced by addPath(), bytesAllowedToSend(), choosePathForRetransmission(), decreaseOutstandingBytes(), dequeueAckedChunks(), dequeueOutboundDataMsg(), getOutboundDataChunk(), handleChunkReportedAsAcked(), handleChunkReportedAsMissing(), increaseOutstandingBytes(), loadPacket(), makeRoomForTsn(), moveChunkToOtherPath(), process_RCV_Message(), process_SEND(), processInitAckArrived(), processInitArrived(), processSackArrived(), pushUlp(), putInDeliveryQ(), SCTPAssociation(), sendInit(), sendOnPath(), stateEntered(), storePacket(), and tsnWasReneged().
SCTPReceiveStreamMap SCTPAssociation::receiveStreams [protected] |
Referenced by deleteStreams(), initStreams(), makeRoomForTsn(), processDataArrived(), pushUlp(), and putInDeliveryQ().
Referenced by SCTP::addLocalAddress(), createSack(), performStateTransition(), printConnBrief(), process_CLOSE(), process_RCV_Message(), process_SEND(), process_STATUS(), processAppCommand(), processDataArrived(), processInitAckArrived(), processInitArrived(), processSackArrived(), processSCTPMessage(), pushUlp(), retransmitCookieEcho(), retransmitShutdown(), retransmitShutdownAck(), sendAbort(), sendEstabIndicationToApp(), sendIndicationToApp(), sendInit(), sendInitAck(), sendShutdown(), sendToIP(), stateEntered(), and SCTP::updateSockPair().
AddressVector SCTPAssociation::remoteAddressList [protected] |
Referenced by process_ASSOCIATE(), processInitAckArrived(), processInitArrived(), sendEstabIndicationToApp(), and sendInit().
| uint16 SCTPAssociation::remotePort |
Referenced by SCTP::addLocalAddress(), SCTP::addLocalAddressToAllRemoteAddresses(), SCTP::addRemoteAddress(), printConnBrief(), process_ASSOCIATE(), processSCTPMessage(), SCTP::removeLocalAddressFromAllRemoteAddresses(), SCTP::removeRemoteAddressFromAllConnections(), SCTPAssociation(), sendAbort(), sendCookieAck(), sendCookieEcho(), sendEstabIndicationToApp(), sendHeartbeat(), sendHeartbeatAck(), sendInit(), sendInitAck(), sendShutdown(), sendShutdownAck(), sendShutdownComplete(), sendToIP(), stateEntered(), and SCTP::updateSockPair().
SCTPQueue* SCTPAssociation::retransmissionQ [protected] |
uint32 SCTPAssociation::sackFrequency [protected] |
Referenced by process_RCV_Message(), processDataArrived(), scheduleSack(), stateEntered(), and timeForSack().
double SCTPAssociation::sackPeriod [protected] |
Referenced by scheduleSack(), SCTPAssociation(), and stateEntered().
| cMessage* SCTPAssociation::SackTimer |
SCTPAlgorithm* SCTPAssociation::sctpAlgorithm [protected] |
Referenced by cloneAssociation(), initAssociation(), process_SEND(), processTimer(), SCTPAssociation(), sendOnPath(), and ~SCTPAssociation().
SCTP* SCTPAssociation::sctpMain [protected] |
Referenced by cloneAssociation(), dequeueAckedChunks(), handleChunkReportedAsMissing(), initAssociation(), pathStatusIndication(), performStateTransition(), pmDataIsSentOn(), pmRttMeasurement(), pmStartPathManagement(), process_ASSOCIATE(), process_OPEN_PASSIVE(), process_RCV_Message(), process_SEND(), process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_HEARTBEAT_INTERVAL(), process_TIMEOUT_INIT_REXMIT(), process_TIMEOUT_RTX(), process_TIMEOUT_SHUTDOWN(), processCookieEchoArrived(), processDataArrived(), processHeartbeatAckArrived(), processInitAckArrived(), processInitArrived(), processTimer(), SCTPAssociation(), sendEstabIndicationToApp(), sendIndicationToApp(), sendInit(), sendInitAck(), sendOnPath(), sendToApp(), sendToIP(), stateEntered(), and updateCounters().
SCTPPathMap SCTPAssociation::sctpPathMap [protected] |
Referenced by addPath(), allPathsInactive(), choosePathForRetransmission(), cwndUpdateAfterSack(), getNextPath(), getOutstandingBytes(), pmStartPathManagement(), printSctpPathMap(), processInitAckArrived(), processInitArrived(), processSackArrived(), SCTP::removeAssociation(), removePath(), sendInit(), sendOnAllPaths(), stopTimers(), and updateFastRecoveryStatus().
cOutVector* SCTPAssociation::sendQueue [protected] |
Referenced by dequeueOutboundDataMsg(), process_SEND(), SCTPAssociation(), stateEntered(), and ~SCTPAssociation().
SCTPSendStreamMap SCTPAssociation::sendStreams [protected] |
SSFunctions SCTPAssociation::ssFunctions [protected] |
uint16 SCTPAssociation::ssModule [protected] |
Referenced by SCTPAssociation().
| cMessage* SCTPAssociation::StartTesting |
Referenced by SCTPAssociation().
SCTPStateVariables* SCTPAssociation::state [protected] |
Referenced by advanceCtsna(), bytesAllowedToSend(), calculateBytesToSendOnPath(), cloneAssociation(), createSack(), cwndUpdateAfterSack(), cwndUpdateMaxBurst(), decreaseOutstandingBytes(), dequeueAckedChunks(), dequeueOutboundDataMsg(), getNextDestination(), handleChunkReportedAsMissing(), increaseOutstandingBytes(), initAssociation(), initCCParameters(), initStreams(), loadPacket(), makeRoomForTsn(), makeVarFromMsg(), moveChunkToOtherPath(), nextChunkFitsIntoPacket(), performStateTransition(), pmDataIsSentOn(), pmStartPathManagement(), process_ABORT(), process_ASSOCIATE(), process_CLOSE(), process_OPEN_PASSIVE(), process_PRIMARY(), process_QUEUE_BYTES_LIMIT(), process_QUEUE_MSGS_LIMIT(), process_RCV_Message(), process_RECEIVE_REQUEST(), process_SEND(), process_TIMEOUT_HEARTBEAT(), process_TIMEOUT_INIT_REXMIT(), process_TIMEOUT_RTX(), process_TIMEOUT_SHUTDOWN(), processCookieAckArrived(), processCookieEchoArrived(), processDataArrived(), processHeartbeatAckArrived(), processInitAckArrived(), processInitArrived(), processSackArrived(), processTimer(), pushUlp(), putInDeliveryQ(), removeFromGapList(), retransmitCookieEcho(), retransmitInit(), retransmitShutdown(), retransmitShutdownAck(), scheduleSack(), SCTPAssociation(), sendCookieEcho(), sendEstabIndicationToApp(), sendInit(), sendInitAck(), sendOnPath(), sendSack(), sendShutdown(), sendShutdownAck(), stateEntered(), storePacket(), streamScheduler(), timeForSack(), tsnIsDuplicate(), updateCounters(), updateGapList(), and ~SCTPAssociation().
int32 SCTPAssociation::status [protected] |
| cMessage* SCTPAssociation::T1_InitTimer |
| cMessage* SCTPAssociation::T2_ShutdownTimer |
| cMessage* SCTPAssociation::T5_ShutdownGuardTimer |
SCTPQueue* SCTPAssociation::transmissionQ [protected] |