#75 Multi-Element Queue Send and Receive

Shane Anderson

I've been playing with FreeRTOS on some STM32F10x's lately and have really enjoyed the experience. One aspect of my projects so far has been to implement a virtual com port (over USB) as well as some real UART drivers using DMA. I find that using Queues for the RX and TX fifo's to be a natural thing to do, but when queuing chunks of character data I find that calling xQueueSend (or Receive) for each byte has too much overhead, especially when you run at really high rates, like over USB.

So, I propose some extended xQueue functions:


Along with ISR versions, that instead of a pointer to the single element to queue/dequeue, points to a consecutive array of elements. These functions will also have some sort of count argument. I've actually implemented this, so if there's interest, I'd be happy to share/post the code I wrote. What I did is added 2 counts: the minimum count and maximum count. This allowed me to wait for 1 to X characters to be in a queue, or wait for all X as desired. The return type was also modified to return the # of elements sent or received--important for knowing how many you actually got if you asked for a range of elements. In general this is similar to the unix write/read functions, with the exception that those don't provide the 'minimum' counts.

In the process, I also added prvCopyDataArrayToQueue and prvCopyDataArrayFromQueue, which was used by the above methods. These handled the larger copy as either 1 or 2 memcpy's, depending on if the copy was contiguous.

I also added checks of the pointers for NULL which would then skip the actual copy, but otherwise work the same. I did this because my UART could DMA data straight into the queue's memory buffer and the DMA ISR (and a timer based check) would simply adjust the element counts using the same functions.

Note: these functions are a superset of the existing QueueSend/Recieve, which in theory could be consolidated to save on code space (which I didn't do for performance reasons.)


  • Richard Damon
    Richard Damon

    I don't think these functions would be a useful as you might first think. It is probably better to transfer this sort of data in an external buffer, with just a pointer to the buffer put in the queue. This will normally avoid the need to copy the data into and back out of the queue. You then need a simple method (perhaps another queue) to provide these buffers to the data source which the data sink returns the buffers to when done.

    Another issue with this is how do you want to handle queue full/empty conditions. Normally these "Arrays" are a message which you really do not want intertwined with another message, so the Send routine needs to verify that there is room for the full message before starting, and locking the queue so no one else can get in the middle (this will also require disabling interrupts, to avoid a sendFromISR from breaking in). If there isn't room, then the task should wait for the queue to no longer be full, but this is a bit confusing as the queue isn't really full now.

    Similarly for the receive operation what are you going to do if their is some data but not enough? That is surely a sign of something gone wrong. Also, since presumably the arrays are of varying sizes (else you should just define the element to be that fixed sized array), how does the receiver know how much data to get? It likely is part of the data packet, but now the receiver needs to read a part of the packet and at the same time lock the queue so no one else can read it (again needing to disable the interrupts, to avoid ReceiveFromISR problems), figure out how much it needs to read, and then reading the rest and unlocking the queue.

    Much simpler would be to put on the queue a small object (like a pointer) with the location of a buffer which holds the data.

  • Shane Anderson
    Shane Anderson

    Thanks for the response Richard!

    I agree that copying stuff into a queue, just to turn around and copy it out later is wasteful. I think this is one of the challenges in using queues well and using some global buffers and queuing pointers (perhaps with an associated type/count) is definitely a win for not copying too much data, but then means those buffers cannot be used again until the receiver of the data is done with it. I'm not keen on lots of malloc/free calls myself, nor pre-allocating all sorts of memory for the worst case if I can help it. As always there are tradeoffs.

    Anyways, let me clarify the use case that I find this feature useful: serial _streams_ of bytes. My first UART driver on the STM32F for FreeRTOS was interrupt driven and would send/receive 1 char per interrupt (and non-ISR receive/send). This was a nice clean design but had a fair amount of overhead per character (which is noticeable at at 1mbaud.) So, I looked at using DMA on the UART. That's a whole 'nother set of challenges, but basically uses half/full IRQs and some reasonably sized memory buffer. These interrupts could then copy into a queue (with element size = 1 byte) but that would be wasteful. Also, you only hit the IRQ and the half and full points. i.e. when characters come in, you won't know it until these points are hit. So, I also poll the DMA registers in the timer tick to look for partial transfers. Again, copies would we wasteful, so I put the data right into the Queue's buffer! With this setup, the sends (from ISR) are free as the DMA does it. Then I read these out on 1 task (multiple readers on a serial stream doesn't make sense) and decode the stream with a little state machine (broken into lines/packets/whatever.) This read operation does perform a copy, which I wish I didn't have to do, but makes things safe for the DMA to continue on. BTW: I handle the overflow of DMA as best as I could, but it could (obviously) corrupt the data if it hasn't been read out yet.

    The cool thing about this is that I can implement a Unix like 'read' call just with:

    int read(char *buf, int len) {
    int len = xQueueReceiveArray(buffer, 1, len, ReadTimeoutTicks);
    return len == 0 ? -1 : len;

    which returns -1 on timeout (optional) or the # of bytes actually read (from 1 up to the requested amount).

    In the reverse direction (UART TX), I think it's even more compelling. You *HAVE* to copy what you're sending *somewhere*, as the strings/values/etc you might send could be on the stack. So, for this, copying a variable bytes into a TX queue is perfect. What I do there (with DMA) is if no DMA is happening, I DMA the largest contiguous chunk (from the Queue) out ASAP. DMA IRQs continue this until the queue is empty. And if a DMA was in process, the data is simply added to the queue and the DMA IRQ handler will send it later.

    In the TX case I have had multiple Senders, for that my min/max sizes come in handy--a packet must be contiguous, so I simply set min=max=packet_len, and multiple senders works.

    I do have one other example that's been useful for me--I have a streaming audio setup, where the # of samples in the incoming blocks != the DAC's DMA block size. So my queue element size is 16 bits and I Send blocks of 30, while receiving blocks of 16 in the DMA ISR. I know I could have done this with a separate buffer, but the Queue already had the logic for the buffer in it, just not the 'multiple element' aspect, which wasn't really that hard to add .

    After thinking this all through, I do really see your point though. For RX, my DMA IRQs and my timer tick DMA polls could have simply added a little structure to an Rx queue with a pointer and a count. I could then Receive these and process the character data directly where it was put by the DMA. The problem may be that I can't easily emulate the unix read() function--If an entry represents more data that I want to read I have to create a new entry and put it back into the queue to keep the unread data in the queue, but perhaps I don't need to emulate read() when I can just process the data in place..

    Thanks again for the feedback--I'll be noodling on this for a while.

  • Richard

    I can see that maybe there would be times when a multiple send would be beneficial, but I'm not sure that a comms driver would be it.

    Normally efficient drivers would use the DMA as you suggest, then use DMA interrupts to indicate when a transfer was complete or a pause in the transfer was detected. The DMA interrupt is used to give a single [normally counting] semaphore to unblock a task when data was available for processing. DMA interrupts are also used to swap buffers when a DMA buffer becomes full.

    This is how the drivers I created recently for the Atmel ASF work - you can use them as a reference: http://www.atmel.com/images/doc42049.pdf

  • Richard

    • priority: 5 --> 2
  • Richard Damon
    Richard Damon

    First let me comment that you seem to have in your mind just a single use case of the queue, that of a communication buffer between a single task and an ISR. This use case gets around a lot of the issues that I was pointing out with the Array send/receive, but it also says that there is a much lighter weight solution available also, using a simple ring buffer with a semaphore.

    You made a comment about the ISR setting up DMA into the queue, it CAN'T, as the queue does not expose the address of its circular buffer, because to do so would be unsafe, as queue inherently allow multiple readers and writers. (Note that Unix streams are by definition single reader/single writer).

    Perhaps a lighter weight Stream object could be built that makes the assumption of single reader and single writer (which removes the need for a lot of the critical sections in the code) could be designed, and this could have an API which allows you to get the address and size of a buffer in the stream of data to send/receive, but I am fairly sure this wouldn't be too hard to design without changes in the FreeRTOS kernel as it can be built with the existing primitives (a semaphore for the task side to block on, and an explicit buffer).

  • kyb

    Just looking for a similar solution for such a problem. And thinking to implement my own.
    I would like to test. Can you share the sources?