Pancakes:IPC EntryBased: Difference between revisions

From OSDev.wiki
Jump to navigation Jump to search
[unchecked revision][unchecked revision]
Content added Content deleted
m (added note on inter-thread and inter-process communication)
m (fixed problem)
Line 26: Line 26:
==== Code ====
==== Code ====
<source lang="c">
<source lang="c">
#ifdef B64
typedef unsigned long long uintptr;
#else
typedef unsigned int uintptr;
#endif
typedef unsigned long long uint64;
typedef unsigned int uint32;
typedef unsigned char uint8;
typedef unsigned short uint16;
typedef int int32;
#endif

struct _RBE {
struct _RBE {
uint32 w;
int32 w;
uint32 r;
int32 r;
uint8 d[];
uint8 d[];
};
};
Line 35: Line 47:
struct _RBME {
struct _RBME {
RB *rb;
RB *rb;
uint32 sz;
int32 sz;
};
};
typedef struct _RBME RBM;
typedef struct _RBME RBM;
Line 41: Line 53:
int rb_write_nbio(RBM *rbm, void *p, uint32 sz) {
int rb_write_nbio(RBM *rbm, void *p, uint32 sz) {
RB volatile *rb;
RB volatile *rb;
uint32 r;
int32 r;
uint32 w;
int32 w;
uint32 *h;
uint32 *h;
uint8 x, y;
int32 x, y;
uint32 asz;
int32 asz;
uint32 max;
int32 max;
rb = (RB volatile*)rbm->rb;
rb = (RB volatile*)rbm->rb;
Line 68: Line 80:
/* not enough space */
/* not enough space */
if ((w < r) && (w + asz) >= r) {
if ((w < r) && (w + asz) >= r) {
//printf("##$#\n");
return 0;
return 0;
}
}
Line 74: Line 85:
/* not enough space */
/* not enough space */
if ((w >= r) && ((rbm->sz - w) + r) < asz) {
if ((w >= r) && ((rbm->sz - w) + r) < asz) {
//printf("LESS THAN\n");
return 0;
return 0;
}
}
/* write length */
/* write length */
if (w == rbm->sz) {
w = 0;
}
rb->d[w++] = sz >> 8;
rb->d[w++] = sz >> 8;
if (w >= rbm->sz) {
if (w >= rbm->sz) {
Line 92: Line 99:
/* split write */
/* split write */
if (w >= r && (rbm->sz - w) < sz) {
max = rbm->sz - w;
if (w >= r && max < sz) {
/* copy first part */
/* copy first part */
max = rbm->sz - w;
for (x = 0; x < max; ++x) {
for (x = 0; x < max; ++x) {
rb->d[w + x] = ((uint8*)p)[x];
rb->d[w + x] = ((uint8*)p)[x];
Line 128: Line 135:
int rb_read_nbio(RBM *rbm, void *p, uint32 *sz, uint32 *advance) {
int rb_read_nbio(RBM *rbm, void *p, uint32 *sz, uint32 *advance) {
RB volatile *rb;
RB volatile *rb;
uint32 r;
int32 r;
uint32 volatile w;
int32 w;
uint32 h;
int32 h;
int x, y;
int32 x, y;
uint8 *_p;
uint8 *_p;
uint32 max;
int32 max;
_p = (uint8*)p;
_p = (uint8*)p;

Revision as of 23:45, 13 April 2014

IPC Shared Memory Entry Based Ring Buffer

This is a lock-free implementation of an entry based ring buffer. It supports message sizes up to 16KB using a 16-bit header, but you can easily change it to support a 24-bit or 32-bit header. Only one reader and one writer are supported.

I have tested this code fairly well, and there appears to be no bugs. To get a major increase in speed you can return a pointer to the inside of the buffer for reads but that is not secure unless you trust the writer.

This is designed to be used for inter-process communication as if you have threads sharing the same address space you might find a faster method would be to have a ring buffer of static pointers and use those to point to memory and use flags to determine if the memory should be freed after the reader uses it because this would eliminate the need for the writer to copy data into the buffer.

Example Usage

RBM      rbm;
char     buf[128];
uintptr  sz;

rbm.sz = 2048 - sizeof(RB);
rbm.rb = (RB*)malloc(2048);

rb_write_nbio(&rbm, &buf[0], 128);
sz = 128;
rb_read_nbio(&rbm, &buf[0], &sz, 0);

The code above will write to the ring buffer, and then read from it. You can have two separate threads with one reading and one writing. For bi-directional communication just use two different buffers.

Code

#ifdef B64
typedef unsigned long long  uintptr;
#else
typedef unsigned int		uintptr;
#endif
typedef unsigned long long  uint64;
typedef unsigned int		uint32;
typedef unsigned char		uint8;
typedef unsigned short		uint16;
typedef int					int32;
#endif

struct _RBE {
	int32			w;
	int32			r;
	uint8			d[];
};
typedef struct _RBE RB;

struct _RBME {
	RB				*rb;
	int32			sz;
};
typedef struct _RBME RBM;

int rb_write_nbio(RBM *rbm, void *p, uint32 sz) {
	RB volatile		*rb;
	int32			r;
	int32			w;
	uint32			*h;
	int32			x, y;
	int32			asz;
	int32			max;
	
	rb = (RB volatile*)rbm->rb;
	
	sz = sz & 0xffff;
	
	r = rb->r;
	w = rb->w;
	
	if (r > rbm->sz) {
		return 0;
	}
	
	if (w > rbm->sz) {
		return 0;
	}

	/* calculate total size including 16-bit length header */
	asz = sz + 2 + 2;
	
	/* not enough space */
	if ((w < r) && (w + asz) >= r) {
		return 0;
	}
	
	/* not enough space */
	if ((w >= r) && ((rbm->sz - w) + r) < asz) {
		return 0;
	}
	
	/* write length */
	rb->d[w++] = sz >> 8;
	if (w >= rbm->sz) {
		w = 0;
	}
	rb->d[w++] = sz & 0xff;
	if (w >= rbm->sz){
		w = 0;
	}
	
	/* split write */
	max = rbm->sz - w;
	if (w >= r && max < sz) {
		/* copy first part */
		for (x = 0; x < max; ++x) {
			rb->d[w + x] = ((uint8*)p)[x];
		}
		/* copy last part */
		for (y = 0; x < sz; ++x, ++y) {
			rb->d[y] = ((uint8*)p)[x];
		}
		
		rb->w = (w + sz) - rbm->sz;
		return 1;
	}
	
	/* straight write */
	for (x = 0; x < sz; ++x) {
		rb->d[w + x] = ((uint8*)p)[x];
	}
	
	/* 
		split read wont leave 'w' == rbm->sz but this will so we have
		to check for it and correct it else it messed up the reader 
		getting them off-track and essentially making communications
		hard to reliably recover if not impossible
	*/
	if (w + sz == rbm->sz) {
		rb->w = 0;
	} else {
		rb->w = w + sz;
	}
	return 1;
}

int rb_read_nbio(RBM *rbm, void *p, uint32 *sz, uint32 *advance) {
	RB volatile		*rb;
	int32			r;
	int32			w;
	int32			h;
	int32  			x, y;
	uint8			*_p;
	int32			max;
	
	_p = (uint8*)p;
		
	rb = (RB volatile*)rbm->rb;
	
	r = rb->r;
	w = rb->w;
	
	if (advance) {
		r = *advance;
	} else {
		if (r > rbm->sz) {
			/* bad header */
			return 0;
		}
	}
	
	if (w > rbm->sz) {
		/* bad header */
		return 0;
	}
	
	if (w == r) {
		return 0;
	}
	
	/* read size (tricky) */
	h = rb->d[r++] << 8;
	if (r == w) {
		return 0;
	}
	if (r >= rbm->sz) {
		r = 0;
	}
	h |= rb->d[r++];
	if (r == w) {
		return 0;
	}
	if (r >= rbm->sz) {
		r = 0;
	}
	
	if (h > (rbm->sz - r) + r) {
		return -1;
	}
	
	if (h > *sz) {
		return -1;
	}
	
	*sz = h;

	/* split read */
	if (r + h >= rbm->sz) {
		max = rbm->sz - r;
		for (x = 0; x < max; ++x) {
			*(_p++) = rb->d[r + x];
		}
		
		max = h - (rbm->sz - r);
		for (x = 0; x < max; ++x) {
			*(_p++) = rb->d[x];
		}
		if (advance) {
			*advance = (r + h) - rbm->sz;
		} else {
			rb->r = (r + h) - rbm->sz;
		}
		return 1;
	}
	
	/* straight read */
	for (x = 0; x < h; ++x) {
		*(_p++) = rb->d[r + x];
	}
	
	if (advance) {
		*advance = r + h;
	} else {
		rb->r = r + h;
	}
	
	return 1;
}