c++ – Single Producer Single Consumer lockless ring buffer implementation

I am writing a simple ring buffer for my own education. Below is a crack at a strategy described in http://www.cse.cuhk.edu.hk/~pclee/www/pubs/ancs09poster.pdf : Producer and Consumer keep local copies of the write and read indexes on different cache lines and to the extent possible avoid touching the shared versions of the same. Performance seems to be on par with the boost spspc queue. Any pointers on how to improve this are appreciated. I am using volatile variables rather than std::atomic because, for reasons I do not understand, performance is better with volatile. Any insight there would be very welcome. I understand that without std::atomic the code might only work on x86_64.

Thanks in advance!

#include <array>
#include <atomic>
#include <limits>
#include <uchar.h>

// Single Producer Single Consumer ring buffer.
// based on http://www.cse.cuhk.edu.hk/~pclee/www/pubs/ancs09poster.pdf

template <class T, size_t MAX_SZ = 10>
class RingBuffer {

   static const size_t cache_line = 64;

 public:
   RingBuffer()
       : // Shared control variables
         shared_r(0), shared_w(0),
         // Consumer state
         consumer_w(0), consumer_r(0),
         // Producer state
         producer_r(0), producer_w(0), uncommited_writes(0) {}

   // Called only by the single producer thread
   // -----------------------------------------
   template <class... ARG>
   bool emplace_enqueue_one(ARG &&... arg) {
      auto result = emplace_enqueue_batch(std::forward<ARG>(arg)...);
      commit_writes();
      return result;
   }

   template <class... ARG>
   bool emplace_enqueue_batch(ARG &&... arg) {

      // Where would the write position be after we enqueue this element?
      size_t next_w = calc_next(producer_w);

      // We always keep an empty slot between the read and write
      // positions, rather than fill our entire buffer. We do this to
      // be able to distinguish between empty (w == r) and full
      // (next(w) == r) buffers. Since we are consulting the
      // producer's copy of the shared read position (producer_r), not
      // the actual read position (shared_r), we might get a false
      // positive (that is we might think we are full when we are not)
      // but not a false negative (that is we think the queue is not
      // full we are right)
      if (next_w == producer_r) {
         // At this point we might be full. To be sure we need to do
         // the more expensive read of the shared read position
         // variable
         size_t actual_r = get_shared_r();
         if (next_w == actual_r) {
            // We are indeed full. At this point we might have to
            // force a commit so that the consumer can see (and drain)
            // uncommited writes.
            commit_writes();
            return false;
         } else
            // We are not actually full, update our local copy of the
            // read position and carry on.
            producer_r = actual_r;
      }

      // Enqueue
      new (&buffer(producer_w)) T(std::forward<ARG>(arg)...);

      // Update our copy of the write position but do not actually
      // update the shared write position. We leave it up to the
      // caller as to when the writes should be visible to the
      // consumer. This allows the caller to amortize the expensive
      // update fo the shared_w variable over multiple writes.
      producer_w = next_w;
      uncommited_writes++;
      return true;
   }

   void commit_writes() {
      if (uncommited_writes) {
         uncommited_writes = 0;
         set_shared_w(producer_w);
      }
   }

   // Called only by the single consumer thread
   // -----------------------------------------
   template <class C>
   size_t consume_one(C &&c) {
      return consume_(std::forward<C>(c), 1);
   }

   template <class C>
   size_t consume_all(C &&c) {
      return consume_(std::forward<C>(c), std::numeric_limits<size_t>::max());
   }

 private:
   template <class C>
   size_t consume_(C c, size_t max_consume_count) {
      size_t consumed_count = 0;
      while (consumed_count < max_consume_count) {
         // Could we be empty?
         if (consumer_w == consumer_r) {
            // We could, but to be sure we have to do the expensive
            // read of the shared write position.
            size_t actual_w = get_shared_w();
            if (consumer_r == actual_w) {
               // We are actually empty. If we managed to read
               // anything so far then update the shared read
               // position.
               if (consumed_count)
                  set_shared_r(consumer_r);
               return consumed_count;
            } else
               // We were not actually empty. Update our copy of the
               // write position. We will do the read below.
               consumer_w = actual_w;
         }
         consumed_count++;
         c(buffer(consumer_r));
         buffer(consumer_r).~T();
         consumer_r = calc_next(consumer_r);
      }
      // If we reach this point that means we were able to consume
      // max_consume_count items, so we need to update the shared_r
      // position.
      set_shared_r(consumer_r);
      return consumed_count;
   }
   size_t calc_next(size_t p) const {
      if (p < (MAX_SZ - 1))
         return p + 1;
      else
         return 0;
   }

   size_t get_shared_r() { return shared_r; }
   void set_shared_r(size_t r) { shared_r = r; }
   size_t get_shared_w() { return shared_w; }
   void set_shared_w(size_t w) { shared_w = w; }

   // cacheline 1 : shared control variables
   // read position is known to be larger or equal than this
   volatile size_t shared_r;
   // write position is known to be larger or equal than this
   volatile size_t shared_w;
   char padding1(cache_line - 2 * sizeof(size_t));

   // cacheline 2: consumer state
   size_t consumer_w; // last known write position (to the consumer)
   size_t consumer_r; // current consumer read position
   char padding2(cache_line - 2 * sizeof(size_t));

   // cacheline 3: producer state
   size_t producer_r;        // last known read position (to the producer)
   size_t producer_w;        // current producer write position
   size_t uncommited_writes; // how far ahead is producer_w from shared_w
   char padding3(cache_line - 3 * sizeof(size_t));

   // cache line 5: start of actual buffer
   std::array<T, MAX_SZ> buffer;
};
```