( 2, ) |
1, , ( ) , Apache Ignite.
, .
:
, , Apache Ignite, , .
, , .
, .
.
( ), , () .
, , , 1024. , , . , . ( ), , , .
, :
data node [JVM 1-4] ( ) client node [Client JVM] ( ).
data node client node ( ). , JVM 1 A,C,D, , , A (Primary) D (Backup).
data node Primary Backup, .
Primary node Backup , , , Backup node.
Primary node , Backup node Primary.
Primary node , Backup node, .
, node. Client JVM B , .
Partitioned Replicated.
, Partitioned- ( + N ) , Replicated data node.
Partitioned- , , Replicated .
. , , Partitioned-. , , Replicated-, .
, .
, Apache Ignite, .
: Replicated Partitioned.
Replicated- , (ignite-sys-cache
) , , , .
Partitioned- (ignite-atomics-sys-cache
) , , .
, :
ignite-sys-cache
, DATA_STRUCTURES_KEY
, Map<_, DataStructureInfo>
( ), , , IgniteAtomicReference
.ignite-atomics-sys-cache
, DataStructureInfo
, ., .
ignite-atomics-sys-cache
GridCacheAtomicReferenceValue
GridCacheAtomicLongValue
.
val
.
, IgniteAtomicReference
:
// , .
ref.compareAndSet(expVal, newVal);
EntryProcessor
process
:
Boolean process(MutableEntry> e, Object... args) {
GridCacheAtomicReferenceValue val = e.getValue();
T curVal = val.get();
// expVal newVal
// ref.compareAndSet(expVal, newVal);
if (F.eq(expVal, curVal)) {
e.setValue(new GridCacheAtomicReferenceValue(newVal));
return true;
}
return false;
}
IgniteAtomicLong
IgniteAtomicReference
, compareAndSet
.
incrementAndGet
, .
Long process(MutableEntry e, Object... args) {
GridCacheAtomicLongValue val = e.getValue();
long newVal = val.get() + 1;
e.setValue(new GridCacheAtomicLongValue(newVal));
return newVal;
}
IgniteAtomicSequence
...
// IgniteAtomicSequence.
final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);
.
//
try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
//
locCntr = seqVal.get();
//
upBound = locCntr + off;
seqVal.set(upBound + 1);
// GridCacheAtomicSequenceValue
dsView.put(key, seqVal);
//
tx.commit();
, ...
seq.incrementAndGet();
.
, , IgniteAtomicSequence
.
:
latch.countDown();
:
//
try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue latchVal = latchView.get(key);
int retVal;
if (val > 0) {
//
retVal = latchVal.get() - val;
if (retVal < 0)
retVal = 0;
}
else
retVal = 0;
latchVal.set(retVal);
//
latchView.put(key, latchVal);
//
tx.commit();
return retVal;
}
0...
latch.await();
Continuous Queries, GridCacheCountDownLatchValue
IgniteCountDownLatch
.
IgniteCountDownLatch
:
/** Internal latch (transient). */
private CountDownLatch internalLatch;
internalLatch
. latch.await()
:
if (internalLatch.getCount() > 0)
internalLatch.await();
...
semaphore.acquire();
:
//
for (;;) {
int expVal = getState();
int newVal = expVal - acquires;
try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheSemaphoreState val = semView.get(key);
boolean retVal = val.getCount() == expVal;
if (retVal) {
// .
// - node,
// .
{
UUID nodeID = ctx.localNodeId();
Map map = val.getWaiters();
int waitingCnt = expVal - newVal;
if (map.containsKey(nodeID))
waitingCnt += map.get(nodeID);
map.put(nodeID, waitingCnt);
val.setWaiters(map);
}
//
val.setCount(newVal);
semView.put(key, val);
tx.commit();
}
return retVal;
}
}
...
semaphore.release();
, , .
int newVal = cur + releases;
, IgniteQueue
ignite-atomics-sys-cache
. colCfg
.
// IgniteQueue.
IgniteQueue queue = ignite.queue("queueName", 0, colCfg);
Atomicity Mode (TRANSACTIONAL, ATOMIC) IgniteQueue
.
queue = new GridCacheQueueProxy(cctx, cctx.atomic() ?
new GridAtomicCacheQueueImpl<>(name, hdr, cctx) :
new GridTransactionalCacheQueueImpl<>(name, hdr, cctx));
IgniteQueue
:
class GridCacheQueueHeader{
private long head;
private long tail;
private int cap;
...
AddProcessor
...
Long process(MutableEntry e, Object... args) {
GridCacheQueueHeader hdr = e.getValue();
boolean rmvd = queueRemoved(hdr, id);
if (rmvd || !spaceAvailable(hdr, size))
return rmvd ? QUEUE_REMOVED_IDX : null;
GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
hdr.capacity(),
hdr.collocated(),
hdr.head(),
hdr.tail() + size, //
hdr.removedIndexes());
e.setValue(newHdr);
return hdr.tail();
}
, , .
...
// ,
// hdr.tail()
QueueItemKey key = itemKey(idx);
:
cache.getAndPut(key, item);
, tail
, head
...
GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
hdr.capacity(),
hdr.collocated(),
hdr.head() + 1, //
hdr.tail(),
null);
.
Long idx = transformHeader(new PollProcessor(id));
QueueItemKey key = itemKey(idx);
T data = (T)cache.getAndRemove(key);
GridAtomicCacheQueueImpl
GridTransactionalCacheQueueImpl
, :
GridAtomicCacheQueueImpl
hdr.tail()
, .
GridTransactionalCacheQueueImpl
. , GridAtomicCacheQueueImpl
, : , .
, poll
, , . , .
.
long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
while (U.currentTimeMillis() < stop) {
data = (T)cache.getAndRemove(key);
if (data != null)
return data;
}
, , .
, , , ConcurrentHashMap , .
, , .
, , .