XRootD
Loading...
Searching...
No Matches
XrdSendQ Class Reference

#include <XrdSendQ.hh>

+ Inheritance diagram for XrdSendQ:
+ Collaboration diagram for XrdSendQ:

Public Member Functions

 XrdSendQ (XrdLink &lP, XrdSysMutex &mP)
 
unsigned int Backlog ()
 
virtual void DoIt ()
 
int Send (const char *buff, int blen)
 
int Send (const struct iovec *iov, int iovcnt, int iotot)
 
void Terminate (XrdLink *lP=0)
 
- Public Member Functions inherited from XrdJob
 XrdJob (const char *desc="")
 
virtual ~XrdJob ()
 

Static Public Member Functions

static void SetAQ (bool onoff)
 
static void SetQM (unsigned int qmVal)
 
static void SetQW (unsigned int qwVal)
 

Additional Inherited Members

- Public Attributes inherited from XrdJob
const char * Comment
 
XrdJobNextJob
 

Detailed Description

Definition at line 42 of file XrdSendQ.hh.

Constructor & Destructor Documentation

◆ XrdSendQ()

XrdSendQ::XrdSendQ ( XrdLink & lP,
XrdSysMutex & mP )

Definition at line 91 of file XrdSendQ.cc.

92 : XrdJob("sendQ runner"),
93 mLink(lP), wMutex(mP),
94 fMsg(0), lMsg(0), delQ(0), theFD(lP.FDnum()),
95 inQ(0), qWmsg(qWarn), discards(0),
96 active(false), terminate(false) {}
XrdJob(const char *desc="")
Definition XrdJob.hh:51

References XrdJob::XrdJob().

+ Here is the call graph for this function:

Member Function Documentation

◆ Backlog()

unsigned int XrdSendQ::Backlog ( )
inline

Definition at line 46 of file XrdSendQ.hh.

46{return inQ;}

◆ DoIt()

void XrdSendQ::DoIt ( )
virtual

Implements XrdJob.

Definition at line 102 of file XrdSendQ.cc.

103{
104 mBuff *theMsg;
105 int myFD, rc;
106 bool theEnd;
107
108// Obtain the lock
109//
110 wMutex.Lock();
111
112// Before we start check if we should delete any messages
113//
114 if (delQ) {RelMsgs(delQ); delQ = 0;}
115
116// Send all queued messages (we can use a blocking send here)
117//
118 while(!terminate && (theMsg = fMsg))
119 {if (!(fMsg = fMsg->next)) lMsg = 0;
120 inQ--; myFD = theFD;
121 wMutex.UnLock();
122 rc = send(myFD, theMsg->mData, theMsg->mLen, 0);
123 free(theMsg);
124 wMutex.Lock();
125 if (rc < 0) {Scuttle(); break;}
126 }
127
128// Before we exit check if we should delete any messages
129//
130 if (delQ) {RelMsgs(delQ); delQ = 0;}
131 if ((theEnd = terminate) && fMsg) RelMsgs(fMsg);
132 active = false;
133 qWmsg = qWarn;
134
135// Release any messages that need to be released. Note that we may have been
136// deleted at this point so we cannot reference anything via "this" once we
137// unlock the mutex. We may also need to delete ourselves.
138//
139 wMutex.UnLock();
140 if (theEnd) delete this;
141}

◆ Send() [1/2]

int XrdSendQ::Send ( const char * buff,
int blen )

Definition at line 230 of file XrdSendQ.cc.

231{
232 mBuff *theMsg;
233 int bleft, bsent;
234
235// If there is an active thread handling messages then we have to queue it.
236// Otherwise try to send it. We need to hold the lock here to prevent messing
237// up the message is only part of it could be sent. This is a non-blocking call.
238//
239 if (active) bleft = blen;
240 else if ((bleft = SendNB(buff, blen)) <= 0) return (bleft ? -1 : blen);
241
242// Allocate buffer for the message
243//
244 if (!(theMsg = (mBuff *)malloc(sizeof(mBuff) + bleft)))
245 {errno = ENOMEM; return -1;}
246
247// Copy the unsent message fragment
248//
249 bsent = blen - bleft;
250 memcpy(theMsg->mData, buff+bsent, bleft);
251 theMsg->mLen = bleft;
252
253// Queue the message.
254//
255 return (QMsg(theMsg) ? blen : -1);
256}

◆ Send() [2/2]

int XrdSendQ::Send ( const struct iovec * iov,
int iovcnt,
int iotot )

Definition at line 262 of file XrdSendQ.cc.

263{
264 mBuff *theMsg;
265 char *body;
266 int bleft, bmore, iovX;
267
268// If there is an active thread handling messages then we have to queue it.
269// Otherwise try to send it. We need to hold the lock here to prevent messing
270// up the message is only part of it could be sent. This is a non-blocking call.
271//
272 if (active)
273 {bleft = 0;
274 for (iovX = 0; iovX < iovcnt; iovX++)
275 if ((bleft = iov[iovX].iov_len)) break;
276 if (!bleft) return iotot;
277 } else {
278 if ((bleft = SendNB(iov, iovcnt, iotot, iovX)) <= 0)
279 return (bleft ? -1 : 0);
280 }
281
282// Readjust the total amount not sent based on where we stopped in the iovec.
283//
284 bmore = bleft;
285 for (int i = iovX+1; i < iovcnt; i++) bmore += iov[i].iov_len;
286
287// Copy the unsent message (for simplicity we will copy the whole iovec stop).
288//
289 if (!(theMsg = (mBuff *)malloc(bmore+sizeof(mBuff))))
290 {errno = ENOMEM; return -1;}
291
292// Setup the message length
293//
294 theMsg->mLen = bmore;
295
296// Copy the first fragment (it cannot be zero length)
297//
298 body = theMsg->mData;
299 memcpy(body, ((char *)iov[iovX].iov_base)+(iov[iovX].iov_len-bleft), bleft);
300 body += bleft;
301
302// All remaining items
303//
304 for (int i = iovX+1; i < iovcnt; i++)
305 {if (iov[i].iov_len)
306 {memcpy(body, iov[i].iov_base, iov[i].iov_len);
307 body += iov[i].iov_len;
308 }
309 }
310
311// Queue the message.
312//
313 return (QMsg(theMsg) ? iotot : 0);
314}

◆ SetAQ()

static void XrdSendQ::SetAQ ( bool onoff)
inlinestatic

Definition at line 54 of file XrdSendQ.hh.

54{qPerm = onoff;}

◆ SetQM()

static void XrdSendQ::SetQM ( unsigned int qmVal)
inlinestatic

Definition at line 56 of file XrdSendQ.hh.

56{qMax = qmVal;}

Referenced by XrdCmsConfig::Configure1().

+ Here is the caller graph for this function:

◆ SetQW()

static void XrdSendQ::SetQW ( unsigned int qwVal)
inlinestatic

Definition at line 58 of file XrdSendQ.hh.

58{qWarn = qwVal;}

◆ Terminate()

void XrdSendQ::Terminate ( XrdLink * lP = 0)

Definition at line 396 of file XrdSendQ.cc.

397{
398// First step is to see if we need to schedule a shutdown prior to quiting
399//
400 if (lP) Sched.Schedule((XrdJob *)new LinkShutdown(lP));
401
402// If there is an active thread then we need to let the thread handle the
403// termination of this object. Otherwise, we can do it now.
404//
405 if (active)
406 {Scuttle();
407 terminate = true;
408 theFD =-1;
409 } else {
410 if (fMsg) {RelMsgs(fMsg); fMsg = lMsg = 0;}
411 if (delQ) {RelMsgs(delQ); delQ = 0;}
412 delete this;
413 }
414}
void Schedule(XrdJob *jp)
XrdScheduler Sched
Definition XrdLinkCtl.cc:54

References XrdJob::XrdJob(), and XrdGlobal::Sched.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: