Focus on usage
A collection is a number of objects defined at 1 instance in time.
Stream.fill(9001)("lo")
<- not asynchronousSet(1, 2, 3, 4)
List("a", "b", "c")
Seq.empty
An asynchronous stream is a source of objects, but the objects are produced at different points in time.
Technically, anywhere you want to handle something asynchronously.
This means that you can use it to represent a single asynchronous result, such as a Future
.
For practical reasons though, similar to how nobody wraps singular objects in List
or Seq
, we normally talk about asynchronous streams when there is a possibility of more than 1 result object.
$("#textbox").addEventListener("keypress", function(event) {
// do stuff
});
When we use callbacks like this, the objects (in this case, HTML events) are all ephemeral; they are "lost" after the function is run (unless if we store them manually).
Things also quickly become unmaintainable if we want to support different behaviour depending on the event (e.g. only run the function if the key was "3")
In Rx, the term Observable is used to describe an asynchronous stream.
So from now on, we will use this (shorter) word.
There are several ways to make an Observable:
// From a collection
val obsInts: Observable[Int] = Observable.items(1, 2, 3)
// Emission of consecutive values at intervals
val obsLongsAtIntervals: Obervable[Long] = Observable.interval(100 millis)
Note: There are other ways of doing this
// a Subject is an observable that allows you to put things in it.
val publishSubject = PublishSubject.create[String]
val someSwingTextField = // your Swing TextField
someSwingTextField.subscribe( { case ValueChanged(x) => publishSubject.onNext(x.text) } )
PublishSubject
, which implements the Subject
interface AND the Observable
interfaceSubjects
allow you to push things into themSubject
, all of which behave differently. Read about them hereThanks to RxScala, Observable
objects have a composition API that is very similar to the standard Scala library.
val periodic: Observable[Long] = Observable.interval(100 millis) // Start with this
val onlyOdds = periodic.filter(_ % 2 == 1)
val times10 = periodic.map(_ * 10)
There are also other transformers that deal specifically with the async nature of Observables.
For more details, read the guide
Essentially callback functions as a type.
trait Observer[-T] {
def onNext(value : T) : Unit
def onError(error : hrowable) : Unit
def onCompleted() : Unit
}
val observer = Observer[Int](
onNext = println(_),
onError = e => println(e),
onCompleted = () => println("all done")
)
val subscriptionObserver = intObservable.subscribe(observer)
val subscriptionFunLit = intObservable.subscribe(onNext = println(_))
subscribe
method (or a function literal)Subscription
, which is an object you call unsubscribe
on to tell the Observable that it should no longer invoke the Observer for that Subscription.val fileMonitorActor = // FileMonitor Actor
val printPath = { p: Path => println(s"Something was modified in $p" }
// Callback for 1 file
fileMonitorActor ! RegisterCallback(
ENTRY_MODIFY
path = Paths get "/Users/lloyd/Desktop/test" ,
printPath )
// Callback for another file ...
fileMonitorActor ! RegisterCallback(
ENTRY_MODIFY
path = Paths get "/Users/lloyd/Desktop/test2" ,
printPath )
val monitor = RxMonitor()
val observable = monitor.observable
val testObservable = observer.filter(_.path.toString.split('/').last == "test")
val test2Observable = observer.filter(_.path.toString.split('/').last == "test2")
val observer = Observer[EventAtPath](onNext = { event => println(s"Something was modified in: ${event.path}")})
monitor.registerPath(ENTRY_MODIFY, Paths get "/Users/lloyd/Desktop")
testObservable.subscribe(observer)
scalaObservable.subscribe(observer)