-

   rss_rss_hh_new

 - e-mail

 

 -

 LiveInternet.ru:
: 17.03.2011
:
:
: 51

:


( 2, )

, 08 2017 . 13:58 +

1, , ( ) , Apache Ignite.


, .


:




, , Apache Ignite, , .


...

, , .


, .


.


( ), , () .


, , , 1024. , , . , . ( ), , , .


, :


  • 4 [A,B,C,D]
  • (.. )

data node [JVM 1-4] ( ) client node [Client JVM] ( ).


data node client node ( ). , JVM 1 A,C,D, , , A (Primary) D (Backup).


Primary Backup node

data node Primary Backup, .


Primary node Backup , , , Backup node.


Primary node , Backup node Primary.


Primary node , Backup node, .


, node. Client JVM B , .



Partitioned Replicated.


, Partitioned- ( + N ) , Replicated data node.



Partitioned- , , Replicated .


. , , Partitioned-. , , Replicated-, .


, .



: Replicated Partitioned.


Replicated- , (ignite-sys-cache) , , , .


Partitioned- (ignite-atomics-sys-cache) , , .


, :


  1. .
  2. ignite-sys-cache, DATA_STRUCTURES_KEY, Map<_, DataStructureInfo> ( ), , , IgniteAtomicReference.
  3. ignite-atomics-sys-cache, DataStructureInfo , .
  4. .

, .


IgniteAtomicReference IgniteAtomicLong ( )


ignite-atomics-sys-cache GridCacheAtomicReferenceValue GridCacheAtomicLongValue.


val.


, IgniteAtomicReference:


//  ,    .
ref.compareAndSet(expVal, newVal);

EntryProcessor process:


EntryProcessor ...

EntryProcessor , .


process MutableEntry ( ) .


EntryProcessor, , ( ).


, , EntryProcessor .


Boolean process(MutableEntry> e, Object... args) {
    GridCacheAtomicReferenceValue val = e.getValue();

    T curVal = val.get();

    //  expVal  newVal    
    // ref.compareAndSet(expVal, newVal);
    if (F.eq(expVal, curVal)) {
        e.setValue(new GridCacheAtomicReferenceValue(newVal));

        return true;
    }

    return false;
}

IgniteAtomicLong IgniteAtomicReference, compareAndSet .


incrementAndGet , .


Long process(MutableEntry e, Object... args) {
    GridCacheAtomicLongValue val = e.getValue();

    long newVal = val.get() + 1;

    e.setValue(new GridCacheAtomicLongValue(newVal));

    return newVal;
}

IgniteAtomicSequence ( )


IgniteAtomicSequence...


//      IgniteAtomicSequence.
final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);

.


//  
try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);

//     
locCntr = seqVal.get();

//  
upBound = locCntr + off;

seqVal.set(upBound + 1);

//   GridCacheAtomicSequenceValue  
dsView.put(key, seqVal);

//  
tx.commit();

, ...


seq.incrementAndGet(); 

.


, , IgniteAtomicSequence.


IgniteCountDownLatch ( )


:


latch.countDown();

:


 //  
 try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
    GridCacheCountDownLatchValue latchVal = latchView.get(key);

    int retVal;

    if (val > 0) {
        //  
        retVal = latchVal.get() - val;

        if (retVal < 0)
            retVal = 0;
    }
    else
        retVal = 0;

    latchVal.set(retVal);

    //  
    latchView.put(key, latchVal);

    //  
    tx.commit();

    return retVal;
}

0...


latch.await();

Continuous Queries, GridCacheCountDownLatchValue IgniteCountDownLatch .


IgniteCountDownLatch :


/** Internal latch (transient). */
private CountDownLatch internalLatch;

internalLatch . latch.await() :


if (internalLatch.getCount() > 0)
    internalLatch.await();

IgniteSemaphore ( )


...


semaphore.acquire();

:


//     
for (;;) {
    int expVal = getState();

    int newVal = expVal - acquires;

    try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ)) {
        GridCacheSemaphoreState val = semView.get(key);

        boolean retVal = val.getCount() == expVal;

        if (retVal) {
            //     .
            //      - node,
            //     .
            {
                UUID nodeID = ctx.localNodeId();

                Map map = val.getWaiters();

                int waitingCnt = expVal - newVal;

                if (map.containsKey(nodeID))
                    waitingCnt += map.get(nodeID);

                map.put(nodeID, waitingCnt);

                val.setWaiters(map);
            }

            //   
            val.setCount(newVal);

            semView.put(key, val);

            tx.commit();
        }

        return retVal;
    }
}

...


semaphore.release();

, , .


int newVal = cur + releases;

IgniteQueue ( )


, IgniteQueue ignite-atomics-sys-cache. colCfg.


//      IgniteQueue.
IgniteQueue queue = ignite.queue("queueName", 0, colCfg);

Atomicity Mode (TRANSACTIONAL, ATOMIC) IgniteQueue.


queue = new GridCacheQueueProxy(cctx, cctx.atomic() ? 
    new GridAtomicCacheQueueImpl<>(name, hdr, cctx) : 
    new GridTransactionalCacheQueueImpl<>(name, hdr, cctx));

IgniteQueue :


class GridCacheQueueHeader{
   private long head;
   private long tail;
   private int cap;
... 

AddProcessor...


Long process(MutableEntry e, Object... args) {
    GridCacheQueueHeader hdr = e.getValue();

    boolean rmvd = queueRemoved(hdr, id);

    if (rmvd || !spaceAvailable(hdr, size))
        return rmvd ? QUEUE_REMOVED_IDX : null;

    GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
        hdr.capacity(),
        hdr.collocated(),
        hdr.head(),
        hdr.tail() + size, //    
        hdr.removedIndexes());

    e.setValue(newHdr);

    return hdr.tail();
}

, , .


...


//  ,   
//  hdr.tail()
QueueItemKey key = itemKey(idx);

:


cache.getAndPut(key, item);

, tail, head...


GridCacheQueueHeader newHdr = new GridCacheQueueHeader(hdr.id(),
    hdr.capacity(),
    hdr.collocated(),
    hdr.head() + 1, //    
    hdr.tail(),
    null);

.


Long idx = transformHeader(new PollProcessor(id));

QueueItemKey key = itemKey(idx);

T data = (T)cache.getAndRemove(key);

GridAtomicCacheQueueImpl GridTransactionalCacheQueueImpl , :


  • GridAtomicCacheQueueImpl hdr.tail(), .


  • GridTransactionalCacheQueueImpl .

, GridAtomicCacheQueueImpl , : , .


, poll , , . , .


.


long stop = U.currentTimeMillis() + RETRY_TIMEOUT;

while (U.currentTimeMillis() < stop) {
    data = (T)cache.getAndRemove(key);

    if (data != null)
        return data;
}

, , .



, , , ConcurrentHashMap , .


, , .


, , .

Original source: habrahabr.ru (comments, light).

https://habrahabr.ru/post/328368/

:  

: [1] []
 

:
: 

: ( )

:

  URL