[ ] RxJava 1 |
Reactive log stream processing with RxJava Part l
ELK .
, - .
, , .*
. ,
, , , , , "", , "" .
, (filter) , , (group by) userID , , .
failedLogStream()
.window(5,TimeUnit.SECONDS)
.flatMap(window ->
window
.groupBy(propertyStringValue("remoteIP"))
.flatMap(grouped -> grouped
.count()
.map( failedLoginsCount -> {
final String remoteIp = grouped.getKey();
return new Pair<>(remoteIp, failedLoginsCount);
}))
)
.filter(pair -> pair.get > 10)
.forEach(System.out::println);
, ( ), reactive streams ( ).
, , - , Kafka Streams Spark or Flink.
, , ( , ).
, Spring5, . spring-web-reactive , spring-web-mvc, () REST -, . Spring, reactive-streams-jvm, ( , , , , ).
Rx.NET, Netflix java, RxJava. , (Reactive EXtensions). , . RxJava, , ( ) , 2. , , Spring reactor , . .
Doug Lea , java.util.concurrent.Flow, , Java 9.
. , , . , , - , , .
Stream (- ), action () , , , , , , ( , ).
, , .
: , Gmail, (emails). , emails, , (CC). , , REST ContactService.
:
Future> emailsFuture = mailstoreService.getUnreadEmails();
List emails = emailsFuture.get(); //
// ,
// , ?
Future> contacts = getContactsForEmails(emails);
for(Mail mail : emails) {
streamRenderEmails(mail, contacts); //push() emails
}
Java 8 CompletableFuture ( thenCompose, thenCombine, thenAccept 50 , , , , ).
CompletableFuture> emailsFuture = mailstoreService.getUnreadEmails();
CompletableFuture> emailsFuture
.thenCompose(emails -> getContactsForEmails(emails)) // List
.thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue()))
Iterator List-, , - . SQL , , ResultSet ( rs.next()) .
public interface Iterator {
/**
* {@code true}, .
*/
boolean hasNext();
/**
* .
*/
E next();
}
" ?"
Iterable emails = mailstoreService.getUnreadEmails();
Iterator emailsIt = emails.iterator();
while(emailsIt.hasNext()) {
Mail mail = emailsIt.next(); //
if(mail != null) {
....
}
}
, , , , . reactive stream programming ( ).
Stream - (X Y, ).
Stream , 0..N :
'marble diagrams'.
, Stream- , . , , .
, , ( | ).
RxJava Observable () Stream- . Spring Reactor- Flux.
Observable stream , .
Observable stream , .
Observable (User), .
public Observable findByUserId(String userId) {...}
// Single
public Single findByUserId(String userId) {...}
Observable , , Publish/Subscriber (/), Subscriber() 3
Observable cartItemsStream = ...;
Subscriber subscriber = new Subscriber() {
@Override
public void onNext(CartItem cartItem) {
System.out.println("Cart Item added " + cartItem);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
};
cartItemsStream.subscribe(subscriber);
Stream-a, , Observer ().
Reactive () , Function ( ), , stream .
, stream ( immutable), ..
Observable filteredCartStream = cartStream.filter(new Func1() {
@Override
public Boolean call(CartItem cartItem) {
return cartItem.isLaptop();
}
});
Observable laptopCartItemsPriceStream = filteredCartStream.map(new Func1() {
@Override
public Long call(CartItem cartItem) {
try {
return priceService.getPrice(cartItem.getId());
} catch(PriceServiceException e) {
thrown new RuntimeException(e);
}
}
});
() Observable (filter, map, groupBy,...) Observable, , , - .
Observable priceStream = cartStream
.filter((cartItem) -> cartItem.isLaptop()).
.map((laptop) -> {
try {
return priceService.getPrice(cartItem.getId());
} catch(PriceServiceException e) {
thrown new RuntimeException(e);
}
});
, , priceStream
, priceService.getPrice()
, , . , rx- , ( ).
, Excel , , , , , , , , .
rx-operator, , - , .
, , , , , .
:
Observable- mover1 = Observable.create(s -> {
while (house.hasItems()) {
s.onNext(house.getItem());
}
s.onCompleted();
});
Observable
- mover2 = mover1.map(item -> putInBox(item));
Subscription mover3 = mover2.subscribe(box -> putInTruck(box),
() -> closeTruck()); // OnCompleted()
" 1 Observable
. , . 2 onNext()
, map()
. onNext()
, . 3, Subscriber
(), onNext()
, ."
RxJava , .
Stream , , (RxJava, RxJS, Rx.NET, etc) ReactiveX framework ( ).
, Spring Reactor ( , ).
, :
, ( , 100$, , )
, Observable>
( ) , groupBy
Observable values = Observable.just(1,4,5,7,8,9,10);
Observable> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even");
Observable remergedStream = Observable.concat(oddEvenStream);
remergedStream.subscribe(number -> System.out.print(number +" "));
//
//1 5 7 9 4 8 10
concat
, - , .
, concat
, , . , .
stream-, , , zip
Zip
, , - , , ( ), stream-.
stream- ( ). , , , .
PS: stream-.
, stream , , stream-.
"" , stream-, .
, combineLatest
, , stream-, .
, Observable
s. :
log("Before create Observable");
Observable someIntStream = Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
log("Create");
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onComplete();
log("Completed");
}
});
log("After create Observable");
log("Subscribing 1st");
someIntStream.subscribe((val) -> LOGGER.info("received " + val));
//
// (for onError and onComplete) , -
log("Subscribing 2nd");
someIntStream.subscribe((val) -> LOGGER.info("received " + val));
, .
, , ObservableOnSubscribe
, , , - .
Observable
, , .
- , call()
3 , , stream .
, call(...)
. , - :
mainThread: Before create Observable
mainThread: After create Observable
mainThread: Subscribing 1st
mainThread: Create
mainThread: received 3
mainThread: received 4
mainThread: received 5
mainThread: Completed
mainThread: Subscribing 2nd
mainThread: Create
mainThread: received 3
mainThread: received 4
mainThread: received 5
mainThread: Completed
, rx . RxJava Observable Subscriber. "" .
Observable
, , - cold observables
( ). hot observables
( ), , .
Cold Observables
- . . , CD , cd . Hot Observables
. , . , , . Hot observable , . ( ).Subjects
Observable
, Observer
( Subscriber
, ( onNext()
) ) Observables
. , ReplaySubject
, (, , OutOfMemory
), PublishSubject
, .
, Observables
.
Observable.just("This", "is", "something")
Observable.from(Iterable collection)
Observable.from(Future future) - , `future`
, ELK, ElasticSearch , , 'pull-based'.
push-based, '' , , .
RabbitMq, , . Logstash RabbitMQ ( FluentD) ELK ElasticSearch RabbitMQ.
, Logstash , , / . , , , RabbitMQ .
RabbitMQ Logback Appender, Logstash.
: AmqpAppender
, RabbitMQ AMQP ( AMQP 0-9-1, 0-9).
ActiveMQ ( AMQP connector) AMQP 1.0, spring-amqp 0-9-1, 0-9, 1.0), 'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'
logstash-logback-encoder JSON Logstash. logstash RabbitMQ (exchange).
docker-compose, logstash-rabbitmq
docker-compose -f docker-compose-rabbitmq.yml up
./event-generate.sh
logstash.
, , , logstash. rabbitmq-output-plugin, :
output {
rabbitmq {
exchange => logstash
exchange_type => direct
host => rabbitmq
key => my_app
}
}
RabbitMQ JMS , AMQP , .
(exchange) .
'routing-key', , . , 'logstash.'
@Bean
ConnectionFactory connectionFactory() {
return new CachingConnectionFactory(host, port);
}
@Bean
RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.declareQueue(queue());
rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange()));
return rabbitAdmin;
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Queue queue() {
return new Queue(queueName, false);
}
DirectExchange exchange() {
return new DirectExchange("logstash");
}
private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("my_app");
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver,
new MessageConverter() {
public Message toMessage(Object o, MessageProperties messageProperties)
throws MessageConversionException {
throw new RuntimeException("Unsupported");
}
public String fromMessage(Message message) throws MessageConversionException {
try {
return new String(message.getBody(), "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UnsupportedEncodingException");
}
}
});
messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver class
return messageListenerAdapter;
}
@Bean
Receiver receiver() {
return new Receiver();
}
'logstash', 'my_app'. MessageListenerAdapter , 'receive' Receiver
, .
, , hot observable
, , , PublishSubject.
public class Receiver {
private PublishSubject publishSubject = PublishSubject.create();
public Receiver() {
}
/**
* Method invoked by Spring whenever a new message arrives
* @param message amqp message
*/
public void receive(Object message) {
log.info("Received remote message {}", message);
JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class);
JsonObject jsonObj = remoteJsonElement.getAsJsonObject();
publishSubject.onNext(jsonObj);
}
public PublishSubject getPublishSubject() {
return publishSubject;
}
}
, SimpleMessageListenerContainer , ( ). Observable , ( onNext
,onComplete
, onError ):
//
Observable.create(s -> {
// Thread A
new Thread(() -> {
s.onNext("one");
s.onNext("two");
}).start();
// Thread B
new Thread(() -> {
s.onNext("three");
s.onNext("four");
}).start();
});
//
//
Observable obs1 = Observable.create(s -> {
// Thread A
new Thread(() -> {
s.onNext("one");
s.onNext("two");
}).start();
});
Observable obs2 = Observable.create(s -> {
// Thread B
new Thread(() -> {
s.onNext("three");
s.onNext("four");
}).start();
});
Observable c = Observable.merge(obs1, obs2);
Observable.serialize()
Subject.toSerialized()
, 1 Thread
ListenerContainer
, . , Subjects
, . .
, Part II ( 2) Rx Playground .