Message queues provide the means to transfer data messages of different lengths between processes residing on the same or on different nodes. Messages can be sent as short messages – where the data is stored directly in the queue - or as long messages which requires a handshake between sender and receiver. The boundary between short and long messages is defined by the user when the queue object is created.
A sending thread can choose to send either short or long messages. When sending short messages, the call returns as soon as the data is copied into the queue. When sending long messages, a header is written to the queue and the call does not return until the receiver thread copies the data or the timeout expires.
A receiving thread pulls messages directly from the queue. When the next available message is a long data message, the receiver can directly fetch the long message or assign a worker thread to do so.
Message queues are created with a specified amount of memory so when calls sending data would overflow the queue, the call fails rather than consuming more system memory.
Message queues have similar functionality to data mailboxes but overcome some of the drawbacks of those objects.
1. Data mailboxes have a fixed maximum message size of 128 bytes, message queues have the maximum (short) message size fixed when it is created.
2. If necessary, occasional messages which are bigger than the maximum defined may be sent via a message queue (long messages).
3. Messages queues are a fixed resource with the maximum size fixed and allocated when you create the object. Data mailboxes will expand to accept more messages by allocating memory from the root process until all of the system memory is consumed.
4. Message queues have an optimized implementation when sending messages between different INtime nodes on the same host, or between Windows and INtime nodes.
In this usage sample, two systems are involved in a message exchange. NodeA will receive messages from NodeB. Both NodeA and NodeB are INtime systems with no dependency on any Windows system.
1. On NodeA, a queue object is created and cataloged. A thread then waits on the queue object for messages to arrive.
Creating a message queue |
Copy Code |
---|---|
RTHANDLE qh = CreateRtQueue(4096, 256, 0); // 256-byte message max size, 4096 bytes total size CatalogRtHandle(root, qh, “MyQueue”); while (1) { BYTE msgbuf[256]; DWORD msgSize; BOOLEAN b; b = ReceiveRtDataMessage(qh, msgbuf, sizeof(msgbuf), WAIT_FOREVER, &msgSize); |
2. NodeB first discovers the location parameter for NodeA then looks up the queue object.
Lookup queue object |
Copy Code |
---|---|
LOCATION loc; RTHANDLE remoteQh; loc = GetRtNodeLocationByName(“NodeA”); if (GetRtNodeStatus(loc) != E_OK) // handle error condition … root = GetRemoteRootRtProcess(loc); remoteQh = LookupRtHandle(root, “MyQueue”, WAIT_FOREVER); |
3. NodeB sends a short message
Send a message |
Copy Code |
---|---|
msglen = 10; SendRtShortDataMessage(remoteQh, msg, msglen); |
4. NodeA processes the message
Handle received message |
Copy Code |
---|---|
b = ReceiveRtDataMessage(qh, msgbuf, sizeof(msgbuf), WAIT_FOREVER, &msgSize); if (b) { // process the message in buffer printf(“received short message of %u bytes\n”, msgSize); } |
5. NodeB sends a long message
Send long message |
Copy Code |
---|---|
msglen = 400; |
6. NodeA processes the message
Handle long message |
Copy Code |
---|---|
b = ReceiveRtDataMessage(qh, msgbuf, sizeof(msgbuf), WAIT_FOREVER, &msgSize); if (b) { … } else { st = GetLastRtError(); if (st == E_NO_LOCAL_BUFFER) { MSG_DESCRIPTOR *msgdesc = (MSG_DESCRIPTOR*)&msgbuf[0]; BYTE *longbuffer = (BYTE *)malloc(msgSize); if (GetRtLongDataMessage(qh, msgdesc, longbuffer, msgSize, &msgSize)) printf(“Received long message of %u bytes\n”, msgSize); } } |
This lists common operations on message queues and the message queue system calls that do the operations:
To . . . | Use this system call . . . |
---|---|
Create a message queue | CreateRtQueue ntxCreateRtQueue |
Delete a message queue | DeleteRtQueue ntxDeleteRtQueue |
Get information about a message queue | GetRtQueueInfo ntxGetRtQueueInfo |
Send short data message | SendRtShortDataMessage ntxSendRtShortDataMessage |
Send long data message | SendRtLongDataMessage ntxSendRtLongDataMessage |
Receive a data message | ReceiveRtDataMessage ntxReceiveRtDataMessage |
Get a long data message | GetRtLongDataMessage ntxGetRtLongDataMessage |
Cancel a long data message | CancelRtLongDataMessage ntxCancelRtLongDataMessage |
Flush a queue | FlushRtQueue ntxFlushRtQueue |