INtime SDK Help
Message Queues
INtime SDK v6 > About INtime > INtime Kernel > Message Queues

Overview

Message queues provide the means to transfer data messages of different lengths between processes residing on the same or on different nodes. Messages can be sent as short messages – where the data is stored directly in the queue - or as long messages which requires a handshake between sender and receiver. The boundary between short and long messages is defined by the user when the queue object is created.

A sending thread can choose to send either short or long messages. When sending short messages, the call returns as soon as the data is copied into the queue. When sending long messages, a header is written to the queue and the call does not return until the receiver thread copies the data or the timeout expires.

A receiving thread pulls messages directly from the queue. When the next available message is a long data message, the receiver can directly fetch the long message or assign a worker thread to do so.

Message queues are created with a specified amount of memory so when calls sending data would overflow the queue, the call fails rather than consuming more system memory.

Message Queues and Data Mailboxes

Message queues have similar functionality to data mailboxes but overcome some of the drawbacks of those objects.

1. Data mailboxes have a fixed maximum message size of 128 bytes, message queues have the maximum (short) message size fixed when it is created.

2. If necessary, occasional messages which are bigger than the maximum defined may be sent via a message queue (long messages).

3. Messages queues are a fixed resource with the maximum size fixed and allocated when you create the object. Data mailboxes will expand to accept more messages by allocating memory from the root process until all of the system memory is consumed.

4. Message queues have an optimized implementation when sending messages between different INtime nodes on the same host, or between Windows and INtime nodes.

Example of usage

In this usage sample, two systems are involved in a message exchange. NodeA will receive messages from NodeB. Both NodeA and NodeB are INtime systems with no dependency on any Windows system.

1. On NodeA, a queue object is created and cataloged. A thread then waits on the queue object for messages to arrive.

Creating a message queue
Copy Code
RTHANDLE qh = CreateRtQueue(4096, 256, 0); // 256-byte message max size, 4096 bytes total size
CatalogRtHandle(root, qh, “MyQueue”);
while (1) {
    BYTE msgbuf[256];
    DWORD msgSize;
    BOOLEAN b;
    b = ReceiveRtDataMessage(qh, msgbuf, sizeof(msgbuf), WAIT_FOREVER, &msgSize);

2. NodeB first discovers the location parameter for NodeA then looks up the queue object.

Lookup queue object
Copy Code
LOCATION loc;
RTHANDLE remoteQh;
loc = GetRtNodeLocationByName(“NodeA”);
if (GetRtNodeStatus(loc) != E_OK)
    // handle error condition …
root = GetRemoteRootRtProcess(loc);
remoteQh = LookupRtHandle(root, “MyQueue”, WAIT_FOREVER);

3. NodeB sends a short message

Send a message
Copy Code
msglen = 10;
SendRtShortDataMessage(remoteQh, msg, msglen);

4. NodeA processes the message

Handle received message
Copy Code
b = ReceiveRtDataMessage(qh, msgbuf, sizeof(msgbuf), WAIT_FOREVER, &msgSize);
if (b) {
    // process the message in buffer
    printf(“received short message of %u bytes\n”, msgSize);
}

5. NodeB sends a long message

Send long message
Copy Code

msglen = 400;
SendRtLongDataMessage(remoteQh, msg, msglen, WAIT_FOREVER);

6. NodeA processes the message

Handle long message
Copy Code
b = ReceiveRtDataMessage(qh, msgbuf, sizeof(msgbuf), WAIT_FOREVER, &msgSize);
if (b) {
    …
}
else {
    st = GetLastRtError();
    if (st == E_NO_LOCAL_BUFFER) { 
        MSG_DESCRIPTOR *msgdesc = (MSG_DESCRIPTOR*)&msgbuf[0];
        BYTE *longbuffer = (BYTE *)malloc(msgSize);
        if (GetRtLongDataMessage(qh, msgdesc, longbuffer, msgSize, &msgSize))
            printf(“Received long message of %u bytes\n”, msgSize);
    }
}

System calls

This lists common operations on message queues and the message queue system calls that do the operations:

To . . . Use this system call . . .
Create a message queue CreateRtQueue
ntxCreateRtQueue
Delete a message queue DeleteRtQueue
ntxDeleteRtQueue
Get information about a message queue GetRtQueueInfo
ntxGetRtQueueInfo
Send short data message SendRtShortDataMessage
ntxSendRtShortDataMessage
Send long data message SendRtLongDataMessage
ntxSendRtLongDataMessage
Receive a data message ReceiveRtDataMessage
ntxReceiveRtDataMessage
Get a long data message GetRtLongDataMessage
ntxGetRtLongDataMessage
Cancel a long data message CancelRtLongDataMessage
ntxCancelRtLongDataMessage
Flush a queue FlushRtQueue
ntxFlushRtQueue

 

See Also

Mailboxes