Page: 1 2 3 4 5 6 7 8 9 10

Chapter 7


STREAMS Modules

7.1 INTRODUCTION

A STREAMS module is a pair of queues and a defined set of kernel-level routines and data structures used to process data, status, and control information. A Stream may have zero or more modules. User processes push (insert) modules on a Stream using the I_PUSH ioctl and pop (remove) them using the I_POP ioctl. Pushing and popping of modules happens in a Last-In-First-Out (LIFO) fashion. Modules manipulate messages as they flow through the Stream.

7.1.1 Routines

STREAMS modules (such as open, close, put, and service) have already been described in the previous sections. This section shows some examples and further describes attributes common to module put and service routines.

A module's put routine is called by the preceding module, driver, or Stream head and before the corresponding service routine. The put routine should do any processing that needs to be done immediately (for example, processing of high-priority messages). Any processing that can be deferred should be left for the corresponding service routine.

The service routine implements flow control, handles depacketization of messages, performs deferred processing, and handles resource allocation. Once the service routine is enabled,

The put and service routines must not call sleep and cannot access the u_area area, because they are executed asynchronously with respect to any process.

Figure 7-1 shows a STREAMS module read-side put routine:

static int modrput(queue_t *q, mblk_t *mp)

{

struct mod_prv *modptr;

modptr = (struct mod_prv *) q->q_ptr; /* for state information */

#ifdef MT

if (pcmsg(mp->b_datap->db_type)) { /* process priority message */

#else

if (mp->b_datap->db_type >= QPCTL) { /* process priority message */

#endif

putnext(q, mp); /* and pass it on */

return;

}

switch(mp->b_datap->db_type) {

case M_DATA: /* may process message data */

putq(q, mp); /* queue message for service routine */

return;

case M_PROTO: /* handle protocol control message */

.

.

.

default:

putnext(q, mp);

return;

}

}

Figure 7-1 Read-Side put Procedure

The following briefly describes the code:

Home


  

Figure 7-2 shows a module write-side put routine.

static int modwput(queue_t *q, mblk_t *mp)

{

struct mod_prv *modptr;

modptr = (struct mod_prv *) q->q_ptr; /* for state information */

#ifdef MT

if (pcmsg(mp->b_datap->db_type)) { /* process priority message */

#else

if (mp->b_datap->db_type >= QPCTL) { /* process priority message */

#endif

putnext(q, mp); /* and pass it on */

return;

}

switch(mp->b_datap->db_type) {

case M_DATA: /* may process message data */

putq(q, mp); /* queue message for service routine */

/* or pass message along */

/* putnext(q, mp); */

return;

case M_PROTO:

.

.

.

case M_IOCTL: /* if command in message is recognized */

/* process message and send back reply */

/* else pass message downstream */

default:

putnext(q, mp);

return;

}

}

Figure 7-2 Write-Side put Procedure

The write-side put routine, unlike the read-side, may be passed M_IOCTL messages. It is up to the module to recognize and process the ioctl command, or pass the message downstream if it does not recognize the command.

Home


  

Figure 7-3 shows a general scenario employed by the module's service routine.

static int modrsrv(queue_t *q)

{

mblk_t *mp;

while ((mp = getq(q)) != (mblk_t *) NULL) {

#ifdef MT

if (!pcmsg(mp->b_datap->db_type) &&

!canputnext(q)) { /* flow control check */

#else

if (!(mp->b_datap->db_type >= QPCTL) &&

!canput(q->q_next)) { /* flow control check */

#endif

putbq(q, mp); /* return message */

return;

}

/* process the message */

switch(mp->b_datap->db_type) {

.

.

.

putnext(q, mp); /* pass the result */

}

} /* while */

}

Figure 7-3 service Routine

The steps are as follows:

  1. Retrieve the first message from the queue using getq.

  2. If the message is high priority, process it immediately, and pass it along the Stream.

  3. If the message is not high priority, the service routine should use the routine to determine if the next module or driver that enqueues messages is within acceptable flow control limits. goes down the Stream (or up on the read-side) until it reaches a module, a driver, or the Stream head with a service routine. When it reaches one, it looks at the total message space currently allocated at that queue for enqueued messages. If the amount of space currently used at that queue exceeds the high-water mark, returns false (zero). If the next queue with a service routine is within acceptable flow control limits, it returns true (nonzero).

  4. If returns false, the service routine should return the message to its own queue using the putbq routine. The service routine can do no further processing now, and it should return.

  5. If returns true, the service routine should complete any processing of the message. This may involve retrieving more messages from the queue, (de)allocating header and trailer information, and performing the control function for the module.

  6. When the service routine is finished processing the message, it may call the putnext routine to pass the resulting message to the next queue.

  7. The previous steps are repeated until there are no messages left on the queue (that is, until getq returns NULL) or returns false.

Home


  

7.1.2 Filter Module Example

The module shown in Figure 7-4, crmod, is an asymmetric filter. On the write-side, newline is converted to carriage return followed by newline. On the read-side, no conversion is done. The declarations of this module are the same as those of the null module presented in the previous section.

/* Simple filter - converts newline -> carriage return, newline */

#include <sys/types.h>

#include <sys/param.h>

#include <sys/stream.h>

#include <sys/stropts.h>

static struct module_info minfo = { 0x09, "crmod", 0, INFPSZ, 512, 128 };

static int modopen, modrput, modwput, modwsrv, modclose;

static struct qinit rinit = {

modrput, NULL, modopen, modclose, NULL, &minfo, NULL };

static struct qinit winit = {

modwput, modwsrv, NULL, NULL, NULL, NULL, &minfo, NULL };

struct streamtab crmdinfo = { &rinit, &winit, NULL, NULL };

#ifdef MT

int moddevflag = D_MP;

#else

int moddevflag = 0;

#endif

Figure 7-4 Filter Module

stropts.h includes definitions of flush message options common to user level, modules and drivers. modopen and modclose are unchanged from the null module example shown earlier in this chapter. modrput is like modput from the null module.

Note that, in contrast to the null module example, a single module_info structure is shared by the read-side and write-side. The module_info includes the flow control high- and low-water marks (512 and 128) for the write queue. (Although the same module_info is used on the read queue side, the read-side has no service procedure, so flow control is not used.) The qinit contains the service procedure pointer.

Home


  

The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown in Figure 7-5.

static int modwput(queue_t *q, mblk_t *mp)

{

#ifdef MT

if (pcmsg(mp->b_datap->db_type) && mp->b_datap->db_type != M_FLUSH)

#else

if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)

#endif

putnext(q, mp);

else

putq(q, mp); /* Put it on the queue */

}

static int modwsrv(queue_t *q)

{

mblk_t *mp;

while ((mp = getq(q)) != NULL) {

switch (mp->b_datap->db_type) {

default:

#ifdef MT

if (canputnext(q)) {

#else

if (canput(q->q_next)) {

#endif

putnext(q, mp);

break;

} else {

putbq(q, mp);

return;

}

case M_FLUSH:

if (*mp->b_rptr & FLUSHW)

flushq(q, FLUSHDATA);

putnext(q, mp);

break;

Figure 7-5 Write Side put Procedure and Queue Flush

Home


  

modwput, the write put procedure, switches on the message type. High-priority messages that are not type M_FLUSH are putnext to avoid scheduling. The others are queued for the service procedure. An M_FLUSH message is a request to remove messages on one or both queues. It can be processed in either the put or service procedure; it is preferable to use the put procedure, so that M_FLUSH is handled immediately.

modwsrv is the write service procedure. It takes a single argument, a pointer to the write queue. modwsrv processes only one high-priority message, M_FLUSH. No other high-priority messages should reach modwsrv.

For an M_FLUSH message, modwsrv checks the first data byte. If FLUSHW (defined in stropts.h) is set, the write queue is flushed with the flushq utility. flushq takes two arguments, the queue pointer and a flag. The flag shows what should be flushed, data messages (FLUSHDATA) or everything (FLUSHALL). In Figure 7-6, data includes M_DATA, M_DELAY,M_PROTO, and M_PCPROTO messages. The choice of what types of messages to flush is module-specific.

If

returns false, ordinary messages are returned to the queue, indicating the downstream path is blocked. Figure 7-6 continues with the remaining part of modwsrv processing M_DATA messages.

Home


  

case M_DATA: {

mblk_t *nbp = NULL;

mblk_t *next;

#ifdef MT

if (!canputnext(q)) {

#else

if (!canput(q->q_next)) {

#endif

putbq(q, mp);

return;

}

/* Filter data, appending to queue */

for (; mp != NULL; mp = next) {

while (mp->b_rptr < mp->b_wptr) {

if (*mp->b_rptr == `\n' )

if (!bappend(&nbp, `\r'))

goto push;

if (!bappend(&nbp, *mp->b_rptr))

goto push;

mp->b_rptr++;

continue;

push:

if (nbp)

putnext(q, nbp);

nbp = NULL;

#ifdef MT

if (!canputnext(q)) {

#else

if (!canput(q->q_next)) {

#endif

if (mp->b_rptr >= mp->b_wptr) {

next = mp->b_cont;

freeb(mp);

mp=next;

}

if (mp)

putbq(q, mp);

return;

}

} /* while */

next = mp->b_cont;

freeb(mp);

} /* for */

if (nbp)

putnext(q, nbp);

} /* case M_DATA */

} /* switch */

} /* while */

}

Figure 7-6 M_DATA Message Processing

The differences in M_DATA processing between this and the example in Section 5.5 relate to the way the new messages are forwarded and flow controlled. To show alternative means of processing messages, this version creates individual new messages rather than a single message containing multiple message blocks. When a new message block is full, it is immediately forwarded with the putnext routine rather than being linked into a single, large message (as was done in the example). This alternative may not be desirable because message boundaries are altered and there is an additional overhead of handling and scheduling multiple messages.

When the filter processing is performed (following push), should check flow control after,

rather than before, each new message is forwarded. This is because there is no provision to hold the new message until the queue becomes unblocked. If the downstream path is blocked, the remaining part of the original message is returned to the queue. Otherwise, processing continues.

Home


  

7.2 FLOW CONTROL

To use the STREAMS flow control mechanism, modules must use service procedures, invoke before calling putnext, and use appropriate values for the high- and low-water marks.

Module flow control limits the amount of data that can be placed on a queue. It prevents depletion of buffers in the buffer pool. Flow control is advisory in nature and can be bypassed. It is managed by high- and low-water marks and regulated by QWANTW and QFULL flags. Module flow control is implemented by using the , getq, putq, putbq, insq, and rmvq routines.

During normal flow control, when a module and driver are in sync, the following steps are taken:

  1. A driver sends data to a module using the putnext routine.

  2. The module's put procedure queues data using putq.

  3. The putq routine increments the module's q_count by the number of bytes in the message and enables the service procedure.

  4. When STREAMS scheduling runs the service procedure, the service procedure retrieves the data by calling the getq utility.

  5. getq decrements q_count by an appropriate value.

If the module cannot process data at the rate at which the driver is sending the data, the following steps occur:

  1. The module's q_count goes above its high-water mark, and the QFULL flag is set by putq.

  2. The driver's fails, and sets QWANTW flag in the module's queue.

  3. The driver sends a command to the device to either stop input, queue the data in its own queue, or drop the data.

  4. The module's q_count falls below its low-water mark because of getq.

  5. getq finds the nearest back queue with a service procedure and enables it.

  6. The scheduler runs the service procedure.

The procedure for banded data is the same, except that qb_count is used in place of q_count.

NOTE

Flow control does not prevent exceeding q_hiwat on a given queue. Flow control may exceed its maximum before canput next detects QFULL and flow is stopped.

Home


  

Figure 7-7 and Figure 7-8 show a line discipline module's flow control. Figure 7-7 is a read-side line discipline module..

/* read-side line discipline module flow control */

ld_read_srv(queue_t *q)

{

mblk_t *mp; /* original message */

mblk_t *bp; /* canonicalized message */

while ((mp = getq(q)) != NULL) {

switch (mp->b_datap->db_type) { /* type of message */

case M_DATA: /* data message */

#ifdef MT

if (canputnext(q)) {

#else

if (canput(q->q_next)) {

#endif

bp = read_canon(mp);

putnext(q, bp);

} else {

putbq(q, mp); /* put message back in queue */

return;

}

break:

default:

#ifdef MT

if (pcmsg(mp->b_datap->db_type))

#else

if (mp->b_datap->db_type >= QPCTL)

#endif

putnext(q, mp); /* high priority message */

else { /* ordinary message */

#ifdef MT

if (canputnext(q))

#else

if (canput(q->q_next))

#endif

putnext(q, mp);

else {

putbq(q, mp);

return;

}

}

break;

}

}

}

Figure 7-7 Read-Side Line Discipline

Home


  

Figure 7-8 shows a write-side line discipline module.

/* write-side line discipline module flow control */

ld_write_srv(queue_t *q)

{

mlbk_t *mp; /* original message */

mblk_t *bp; /* canonicalized message */

while ((mp = getq(q)) != NULL) {

switch (mp->b_datap->db_type) { /* type of message */

case M_DATA: /* data message */

#ifdef MT

if (canputnext(q)) {

#else

if (canput(q->q_next)) {

#endif

bp = write_canon(mp);

putnext(q, bp);

} else {

putbq(q, mp);

return;

}

break;

case M_IOCTL:

ld_ioctl(q, mp);

break:

default:

#ifdef MT

if (pcmsg(mp->b_datap->db_type))

#else

if (mp->b_datap->db_type >= QPCTL)

#endif

putnext(q, mp); /* high priority message */

else { /* ordinary message */

#ifdef MT

if (canputnext(q))

#else

if (canput(q->q_next))

#endif

putnext(q, mp);

else {

putbq(q, mp);

return;

}

}

break;

}

}

}

Figure 7-8 Write-Side Line Discipline

7.3 DESIGN GUIDELINES

Module developers should follow these guidelines:


Home

Contents Previous Chapter Next Chapter Index Glossary