9
0
mirror of https://github.com/Winds-Studio/Leaf.git synced 2025-12-26 10:29:13 +00:00

optimize MpmcQueue memory layout (#427)

* optimize MpmcQueue memory layout

fix share cache line

* move to tail
This commit is contained in:
hayanesuru
2025-07-27 13:14:03 +09:00
committed by GitHub
parent deec3e5d37
commit 107d616ff2

View File

@@ -29,20 +29,14 @@ public final class MpmcQueue<T> {
@Nullable
private final T[] buffer;
@SuppressWarnings("unused")
private final Padded padded1 = new Padded();
@SuppressWarnings("FieldMayBeFinal")
private volatile long reads = 0L;
@SuppressWarnings("unused")
private final Padded padded2 = new Padded();
@SuppressWarnings("FieldMayBeFinal")
private volatile long writes = 0L;
private final PaddedReads reads = new PaddedReads();
private final PaddedWrites writes = new PaddedWrites();
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
READ = l.findVarHandle(MpmcQueue.class, "reads", long.class);
WRITE = l.findVarHandle(MpmcQueue.class, "writes", long.class);
READ = l.findVarHandle(PaddedReads.class, "reads", long.class);
WRITE = l.findVarHandle(PaddedWrites.class, "writes", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
@@ -72,7 +66,7 @@ public final class MpmcQueue<T> {
}
public boolean send(@NotNull final T item) {
long write = (long) WRITE.getAcquire(this);
long write = (long) WRITE.getAcquire(this.writes);
boolean success;
long newWrite = 0L;
long index = 0L;
@@ -80,26 +74,26 @@ public final class MpmcQueue<T> {
while (true) {
spinWait(attempts++);
final long inProgressCnt = (write & PENDING_MASK);
if ((((write >>> INDEX_SHIFT) + 1L) & mask) == ((long) READ.getVolatile(this) >>> INDEX_SHIFT)) {
if ((((write >>> INDEX_SHIFT) + 1L) & mask) == ((long) READ.getVolatile(this.reads) >>> INDEX_SHIFT)) {
success = false;
break;
}
if (inProgressCnt == MAX_IN_PROGRESS) {
write = (long) WRITE.getAcquire(this);
write = (long) WRITE.getAcquire(this.writes);
continue;
}
index = ((write >>> INDEX_SHIFT) + inProgressCnt) & mask;
if (((index + 1L) & mask) == ((long) READ.getVolatile(this) >>> INDEX_SHIFT)) {
if (((index + 1L) & mask) == ((long) READ.getVolatile(this.reads) >>> INDEX_SHIFT)) {
success = false;
break;
}
newWrite = write + 1L;
if (WRITE.weakCompareAndSetAcquire(this, write, newWrite)) {
if (WRITE.weakCompareAndSetAcquire(this.writes, write, newWrite)) {
success = true;
break;
}
write = (long) WRITE.getVolatile(this);
write = (long) WRITE.getVolatile(this.writes);
}
if (!success) {
return false;
@@ -112,17 +106,17 @@ public final class MpmcQueue<T> {
: write >>> INDEX_SHIFT == index
? write + (1L << INDEX_SHIFT) - 1L & (mask << INDEX_SHIFT | DONE_PENDING_MASK)
: write + (1L << DONE_SHIFT);
if (WRITE.weakCompareAndSetRelease(this, write, n)) {
if (WRITE.weakCompareAndSetRelease(this.writes, write, n)) {
break;
}
write = (long) WRITE.getVolatile(this);
write = (long) WRITE.getVolatile(this.writes);
spinWait(attempts++);
}
return true;
}
public @Nullable T recv() {
long read = (long) READ.getAcquire(this);
long read = (long) READ.getAcquire(this.reads);
boolean success;
long index = 0;
long newRead = 0L;
@@ -130,25 +124,25 @@ public final class MpmcQueue<T> {
while (true) {
spinWait(attempts++);
final long inProgressCnt = (read & PENDING_MASK);
if ((read >>> INDEX_SHIFT) == ((long) WRITE.getVolatile(this) >>> INDEX_SHIFT)) {
if ((read >>> INDEX_SHIFT) == ((long) WRITE.getVolatile(this.writes) >>> INDEX_SHIFT)) {
success = false;
break;
}
if (inProgressCnt == MAX_IN_PROGRESS) {
read = (long) READ.getAcquire(this);
read = (long) READ.getAcquire(this.reads);
continue;
}
index = ((read >>> INDEX_SHIFT) + inProgressCnt) & mask;
if ((index & mask) == ((long) WRITE.getVolatile(this) >>> INDEX_SHIFT)) {
if ((index & mask) == ((long) WRITE.getVolatile(this.writes) >>> INDEX_SHIFT)) {
success = false;
break;
}
newRead = read + 1L;
if (READ.weakCompareAndSetAcquire(this, read, newRead)) {
if (READ.weakCompareAndSetAcquire(this.reads, read, newRead)) {
success = true;
break;
}
read = (long) READ.getVolatile(this);
read = (long) READ.getVolatile(this.reads);
}
if (!success) {
return null;
@@ -162,18 +156,18 @@ public final class MpmcQueue<T> {
: read >>> INDEX_SHIFT == index
? read + (1L << INDEX_SHIFT) - 1L & (mask << INDEX_SHIFT | DONE_PENDING_MASK)
: read + (1L << DONE_SHIFT);
if (READ.weakCompareAndSetRelease(this, read, n)) {
if (READ.weakCompareAndSetRelease(this.reads, read, n)) {
break;
}
read = (long) READ.getVolatile(this);
read = (long) READ.getVolatile(this.reads);
spinWait(attempts++);
}
return result;
}
public int length() {
final long reads = (long) READ.getVolatile(this);
final long writes = (long) WRITE.getVolatile(this);
final long reads = (long) READ.getVolatile(this.reads);
final long writes = (long) WRITE.getVolatile(this.writes);
final long readIndex = (reads >>> INDEX_SHIFT);
final long writeIndex = (writes >>> INDEX_SHIFT);
return (int) (readIndex <= writeIndex ? writeIndex - readIndex : writeIndex + capacity - readIndex);
@@ -185,8 +179,8 @@ public final class MpmcQueue<T> {
}
public int remaining() {
final long reads = (long) READ.getVolatile(this);
final long writes = (long) WRITE.getVolatile(this);
final long reads = (long) READ.getVolatile(this.reads);
final long writes = (long) WRITE.getVolatile(this.writes);
final long readIndex = (reads >>> INDEX_SHIFT);
final long writeIndex = (writes >>> INDEX_SHIFT);
final long len = readIndex <= writeIndex ?
@@ -196,7 +190,7 @@ public final class MpmcQueue<T> {
}
@SuppressWarnings("unused")
private final static class Padded {
private final static class PaddedReads {
private byte i0, i1, i2, i3, i4, i5, i6, i7, i8, i9, i10, i11, i12, i13, i14, i15;
private byte j0, j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12, j13, j14, j15;
private byte k0, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, k13, k14, k15;
@@ -205,5 +199,20 @@ public final class MpmcQueue<T> {
private byte n0, n1, n2, n3, n4, n5, n6, n7, n8, n9, n10, n11, n12, n13, n14, n15;
private byte o0, o1, o2, o3, o4, o5, o6, o7, o8, o9, o10, o11, o12, o13, o14, o15;
private byte p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15;
private volatile long reads;
}
@SuppressWarnings("unused")
private final static class PaddedWrites {
private byte i0, i1, i2, i3, i4, i5, i6, i7, i8, i9, i10, i11, i12, i13, i14, i15;
private byte j0, j1, j2, j3, j4, j5, j6, j7, j8, j9, j10, j11, j12, j13, j14, j15;
private byte k0, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, k13, k14, k15;
private byte l0, l1, l2, l3, l4, l5, l6, l7, l8, l9, l10, l11, l12, l13, l14, l15;
private byte m0, m1, m2, m3, m4, m5, m6, m7, m8, m9, m10, m11, m12, m13, m14, m15;
private byte n0, n1, n2, n3, n4, n5, n6, n7, n8, n9, n10, n11, n12, n13, n14, n15;
private byte o0, o1, o2, o3, o4, o5, o6, o7, o8, o9, o10, o11, o12, o13, o14, o15;
private byte p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15;
private volatile long writes;
}
}