-

   rss_rss_hh_new

 - e-mail

 

 -

 LiveInternet.ru:
: 17.03.2011
:
:
: 51

:


[ ] RxJava 1

, 31 2017 . 13:48 +

Reactive log stream processing with RxJava Part l


ELK .
, - .


, , .*


. ,


, , , , , "", , "" .


, (filter) , , (group by) userID , , .


 failedLogStream()
     .window(5,TimeUnit.SECONDS)
     .flatMap(window ->
                window
                .groupBy(propertyStringValue("remoteIP"))
                .flatMap(grouped -> grouped
                    .count()
                    .map( failedLoginsCount -> {
                          final String remoteIp = grouped.getKey();
                          return new Pair<>(remoteIp, failedLoginsCount);
                    }))
     )
     .filter(pair -> pair.get > 10)
     .forEach(System.out::println);           

, ( ), reactive streams ( ).



, , - , Kafka Streams Spark or Flink.


, , ( , ).


, Spring5, . spring-web-reactive , spring-web-mvc, () REST -, . Spring, reactive-streams-jvm, ( , , , , ).


Rx.NET, Netflix java, RxJava. , (Reactive EXtensions). , . RxJava, , ( ) , 2. , , Spring reactor , . .


Doug Lea , java.util.concurrent.Flow, , Java 9.



. , , . , , - , , .


Never block


Stream (- ), action () , , , , , , ( , ).


, , .


: , Gmail, (emails). , emails, , (CC). , , REST ContactService.


:


Future> emailsFuture = mailstoreService.getUnreadEmails();  
List emails = emailsFuture.get(); //    
//  ,       
//    ,      ?

Future> contacts = getContactsForEmails(emails);  
for(Mail mail : emails) {  
  streamRenderEmails(mail, contacts); //push() emails 
}

Java 8 CompletableFuture ( thenCompose, thenCombine, thenAccept 50 , , , , ).


CompletableFuture> emailsFuture = mailstoreService.getUnreadEmails();

CompletableFuture> emailsFuture  
  .thenCompose(emails -> getContactsForEmails(emails)) //     List 
  .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue()))

Iterator List-, , - . SQL , , ResultSet ( rs.next()) .


public interface Iterator {  
    /**
     *  {@code true},     .
     */
    boolean hasNext();

    /**
     *    .
     */
    E next();
}

" ?"


Iterable emails = mailstoreService.getUnreadEmails();  
Iterator emailsIt = emails.iterator();

while(emailsIt.hasNext()) {  
  Mail mail = emailsIt.next(); //           
  if(mail != null) {
      ....
  }
}

, , , , . reactive stream programming ( ).


Stream?


Everything is a stream


Stream - (X Y, ).


Stream , 0..N :


  • , ,
  • , ()

'marble diagrams'.


Marble diagram for Observable


, Stream- , . , , .


, , ( | ).


RxJava Observable () Stream- . Spring Reactor- Flux.

Observable stream , .

Observable stream , .

Observable (User), .

    public Observable findByUserId(String userId) {...}
    //  Single    
    public Single findByUserId(String userId) {...}

Observable , , Publish/Subscriber (/), Subscriber() 3

        Observable cartItemsStream = ...;

        Subscriber subscriber = new Subscriber() {
            @Override
            public void onNext(CartItem cartItem) {
                System.out.println("Cart Item added " + cartItem);
            }

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
        };

        cartItemsStream.subscribe(subscriber);


Stream-a, , Observer ().


Reactive () , Function ( ), , stream .


, stream ( immutable), ..


Observable filteredCartStream = cartStream.filter(new Func1() {  
            @Override
            public Boolean call(CartItem cartItem) {
                return cartItem.isLaptop();
            }
        });

Observable laptopCartItemsPriceStream = filteredCartStream.map(new Func1() {  
            @Override
            public Long call(CartItem cartItem) {
                try {
                    return priceService.getPrice(cartItem.getId());
                } catch(PriceServiceException e) {
                    thrown new RuntimeException(e);
                }
            }
        });

() Observable (filter, map, groupBy,...) Observable, , , - .


Observable priceStream = cartStream  
                        .filter((cartItem) -> cartItem.isLaptop()).
                        .map((laptop) -> {
                             try {
                                  return priceService.getPrice(cartItem.getId());
                            } catch(PriceServiceException e) {
                                 thrown new RuntimeException(e);
                            }
                        });

, , priceStream, priceService.getPrice() , , . , rx- , ( ).


, Excel , , , , , , , , .


rx-operator, , - , .


, , , , , .


:


Observable mover1 = Observable.create(s -> {  
   while (house.hasItems()) {
    s.onNext(house.getItem());
   }
   s.onCompleted();
});

Observable mover2 = mover1.map(item -> putInBox(item));

Subscription mover3 = mover2.subscribe(box -> putInTruck(box),  
   () -> closeTruck()); //    OnCompleted()


" 1 Observable. , . 2 onNext(), map() . onNext() , . 3, Subscriber(), onNext(), ."


RxJava , .



Stream , , (RxJava, RxJS, Rx.NET, etc) ReactiveX framework ( ).


, Spring Reactor ( , ).


, :


**Filter**


, ( , 100$, , )


, Observable>( ) , groupBy


> **group by**


        Observable values = Observable.just(1,4,5,7,8,9,10);
        Observable> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even");
        Observable remergedStream = Observable.concat(oddEvenStream);
        remergedStream.subscribe(number -> System.out.print(number +" "));

//
//1 5 7 9 4 8 10 

concat, - , .
> **Concat**


, concat , , . , .


stream-, , , zip
> **Zip operator**


Zip , , - , , ( ), stream-.


>  ()


stream- ( ). , , , .


PS: stream-.


, stream , , stream-.


"" , stream-, .


, combineLatest , , stream-, .


> Combine latest


Push


, Observables. :


        log("Before create Observable");
        Observable someIntStream = Observable
                .create(new ObservableOnSubscribe() {
                    @Override
                    public void subscribe(ObservableEmitter emitter) throws Exception {
                        log("Create");
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                        emitter.onComplete();
                        log("Completed");
                    }
                });
        log("After create Observable");

        log("Subscribing 1st");
        someIntStream.subscribe((val) -> LOGGER.info("received " + val));
        //   
        // (for onError and onComplete)     , - 

        log("Subscribing 2nd");
        someIntStream.subscribe((val) -> LOGGER.info("received " + val));

, .
, , ObservableOnSubscribe, , , - .


Observable, , .


- , call() 3 , , stream .


, call(...) . , - :


mainThread: Before create Observable  
mainThread: After create Observable  
mainThread: Subscribing 1st  
mainThread: Create  
mainThread: received 3  
mainThread: received 4  
mainThread: received 5  
mainThread: Completed  
mainThread: Subscribing 2nd  
mainThread: Create  
mainThread: received 3  
mainThread: received 4  
mainThread: received 5  
mainThread: Completed  

, rx . RxJava Observable Subscriber. "" .


Observable, , - cold observables( ). hot observables( ), , .


  • Cold Observables - . . , CD , cd .
  • Hot Observables . , . , , . Hot observable , . ( ).

Subjects Observable, Observer ( Subscriber , ( onNext()) ) Observables . , ReplaySubject, (, , OutOfMemory), PublishSubject , .
, Observables .


Observable.just("This", "is", "something")  
Observable.from(Iterable collection)  
Observable.from(Future future) -    ,  `future`  

ELK- RabbitMQ , push


, ELK, ElasticSearch , , 'pull-based'.


push-based, '' , , .


RabbitMq, , . Logstash RabbitMQ ( FluentD) ELK ElasticSearch RabbitMQ.


, Logstash , , / . , , , RabbitMQ .


RabbitMQ Logback Appender, Logstash.


: AmqpAppender, RabbitMQ AMQP ( AMQP 0-9-1, 0-9).


ActiveMQ ( AMQP connector) AMQP 1.0, spring-amqp 0-9-1, 0-9, 1.0), 'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'


logstash-logback-encoder JSON Logstash. logstash RabbitMQ (exchange).


docker-compose, logstash-rabbitmq


docker-compose -f docker-compose-rabbitmq.yml up

./event-generate.sh
logstash.


, , , logstash. rabbitmq-output-plugin, :


output {  
    rabbitmq {
        exchange => logstash
        exchange_type => direct
        host => rabbitmq
        key => my_app
    }
}

RabbitMQ JMS , AMQP , .


amqp


(exchange) .


'routing-key', , . , 'logstash.'


AMQP . Spring c RabbitMq


    @Bean
    ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(host, port);
    }

    @Bean
    RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.declareQueue(queue());
        rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange()));
        return rabbitAdmin;
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);

        return container;
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    DirectExchange exchange() {
        return new DirectExchange("logstash");
    }

    private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("my_app");
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver,
                new MessageConverter() {
            public Message toMessage(Object o, MessageProperties messageProperties)
                    throws MessageConversionException {
                throw new RuntimeException("Unsupported");
            }

            public String fromMessage(Message message) throws MessageConversionException {
                try {
                    return new String(message.getBody(), "UTF-8");
                } catch (UnsupportedEncodingException e) {
                    throw new RuntimeException("UnsupportedEncodingException");
                }
            }
        });
        messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver class
        return messageListenerAdapter;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

'logstash', 'my_app'. MessageListenerAdapter , 'receive' Receiver , .


, , hot observable, , , PublishSubject.


public class Receiver {  
    private PublishSubject publishSubject = PublishSubject.create();

    public Receiver() {
    }

    /**
     * Method invoked by Spring whenever a new message arrives
     * @param message amqp message
     */
    public void receive(Object message) {
        log.info("Received remote message {}", message);
        JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class);
        JsonObject jsonObj = remoteJsonElement.getAsJsonObject();

        publishSubject.onNext(jsonObj);
    }

    public PublishSubject getPublishSubject() {
        return publishSubject;
    }
}

, SimpleMessageListenerContainer , ( ). Observable , ( onNext,onComplete, onError ):


//    
Observable.create(s -> {  
                    // Thread A
                    new Thread(() -> {
                        s.onNext("one");
                        s.onNext("two");
                    }).start();

                    // Thread B
                    new Thread(() -> {
                        s.onNext("three");
                        s.onNext("four");
                    }).start();
                });
//    

// 
Observable obs1 = Observable.create(s -> {  
                    // Thread A
                    new Thread(() -> {
                        s.onNext("one");
                        s.onNext("two");
                    }).start();
                  });

Observable obs2 = Observable.create(s -> {  
                    // Thread B
                    new Thread(() -> {
                        s.onNext("three");
                        s.onNext("four");
                    }).start();
                });

Observable c = Observable.merge(obs1, obs2); 

Observable.serialize() Subject.toSerialized(), 1 Thread ListenerContainer, . , Subjects , . .


, Part II ( 2) Rx Playground .

Original source: habrahabr.ru (comments, light).

https://habrahabr.ru/post/329894/

:  

: [1] []
 

:
: 

: ( )

:

  URL