RxScala and Schwatcher
A couple days ago, I released v0.1.3 of Schwatcher, which introduces the ability to monitor events on file paths using a composable Rx Observable interface. “What does that even mean and why should you care?” is what this blog post tries to answer.
The original version of Schwatcher allowed you to tell a MonitorActor
what callback you want to fire when a certain type of event happened on a file path. This is fine and there are people out there using it in production as is. The limitation to this approach is that (at least by default), the events are difficult to treat as data and thus difficult to compose.
With Rx, we turn file path events into an asynchronous stream/channel. Essentially, you tell a RxMonitor
object what path and event type you want to monitor and when an event happens, it will get pushed into its observable
(the stream). You can then choose to filter, map, or fold over this data stream, creating new data streams. If you wish to cause side-effects, you can add one or more observer
s to these data streams.
Note: this blog post applies to v0.1.3 of Schwatcher, which uses v0.18.1 of RxScala. Future versions may introduce breaking changes that invalidate the examples in this blog post.
Example
Suppose we have the following directory structure:
1 2 |
|
Let’s set up an RxMonitor
object to monitor for file creation and modifications events in directory1
(note: all operations on RxMonitor
objects are thread-safe).
While we’re at it, let’s grab the base observable
from the monitor as well. Note that this Observable
will, according to the registerPath
and unregisterPath
calls made to its parent RxMonitor
, push all EventAtPath
s to its Observer
s. More on what an Observer
is later, but for now, think of an Observable
as a data stream and an Observer
as an object gets pushed new objects from the Observable
that it is, well, observing.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Let’s create 2 more Observables
. Let’s make one called createsOnly
that will only care about create events in the directory and another one called scalaSourceCreatesOnly
that only cares about create events for files ending in .scala
. Notice that we’re composing here :)
1 2 |
|
Now, let’s create some basic Observers
that we can pass to the subscribe
method of our new Observable
s. An Observer
at minimum implements an onNext function, which takes an element that will be pushed to it from the Observable
that it subscribes to and returns nothing (Unit
). It may optionally implement onError (a function which takes a Throwable
as an argument and returns nothing) and onCompleted (0 argument function that is called when the Observable
it is subscribed to is finished and will no longer send further objects):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
Now let’s make stuff happen in another terminal.
1 2 3 4 |
|
The following will be outputted
1 2 3 4 5 6 7 |
|
Lastly, since we’re done, let’s call the stop()
method on the RxMonitor
object so that subscribed Observers
are notified and we stop the underlying MonitorActor
as well. Cleaning up is A Good Thing (TM).
1
|
|
Conclusion
I hope this post has demonstrated the power of using RxScala’s Observable
as an abstraction of asynchronous events into a tangible data structure, and how using it through Schwatcher might simplify the process of building your own applications. If you have any questions or spot any mistakes, please feel free to leave a comment.