A couple days ago, I released v0.1.3 of Schwatcher, which introduces the ability to monitor events on file paths using a composable Rx Observable interface. “What does that even mean and why should you care?” is what this blog post tries to answer.

The original version of Schwatcher allowed you to tell a MonitorActor what callback you want to fire when a certain type of event happened on a file path. This is fine and there are people out there using it in production as is. The limitation to this approach is that (at least by default), the events are difficult to treat as data and thus difficult to compose.

With Rx, we turn file path events into an asynchronous stream/channel. Essentially, you tell a RxMonitor object what path and event type you want to monitor and when an event happens, it will get pushed into its observable (the stream). You can then choose to filter, map, or fold over this data stream, creating new data streams. If you wish to cause side-effects, you can add one or more observers to these data streams.

Note: this blog post applies to v0.1.3 of Schwatcher, which uses v0.18.1 of RxScala. Future versions may introduce breaking changes that invalidate the examples in this blog post.

Example

Suppose we have the following directory structure:

1
2
directory1
  - directoryFile1

Let’s set up an RxMonitor object to monitor for file creation and modifications events in directory1 (note: all operations on RxMonitor objects are thread-safe).

While we’re at it, let’s grab the base observable from the monitor as well. Note that this Observable will, according to the registerPath and unregisterPath calls made to its parent RxMonitor, push all EventAtPaths to its Observers. More on what an Observer is later, but for now, think of an Observable as a data stream and an Observer as an object gets pushed new objects from the Observable that it is, well, observing.

1
2
3
4
5
6
7
8
9
10
11
12
import com.beachape.filemanagement.RxMonitor
import com.beachape.filemanagement.Messages.EventAtPath
import java.nio.file.Paths
import java.nio.file.StandardWatchEventKinds._
import rx.lang.scala.Observer

val monitor = RxMonitor()
val observable = monitor.observable
val directory1 = Paths get "/Users/lloyd/Desktop/directory1"

monitor.registerPath(ENTRY_MODIFY, directory1)
monitor.registerPath(ENTRY_CREATE, directory1)

Let’s create 2 more Observables. Let’s make one called createsOnly that will only care about create events in the directory and another one called scalaSourceCreatesOnly that only cares about create events for files ending in .scala. Notice that we’re composing here :)

1
2
val createsOnly = observable.filter(_.event == ENTRY_CREATE)
val scalaSourceCreatesOnly = createsOnly.filter(_.path.toString.endsWith(".scala"))

Now, let’s create some basic Observers that we can pass to the subscribe method of our new Observables. An Observer at minimum implements an onNext function, which takes an element that will be pushed to it from the Observable that it subscribes to and returns nothing (Unit). It may optionally implement onError (a function which takes a Throwable as an argument and returns nothing) and onCompleted (0 argument function that is called when the Observable it is subscribed to is finished and will no longer send further objects):

attach observers (attachObservers.scala) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val createAndModifyObserver = Observer[EventAtPath](onNext = { event => println(s"Something was created or modified: $event")})
val createOnlyObserver = Observer[EventAtPath](onNext = { event => println(s"Something was created: $event")})

observable.subscribe(createAndModifyObserver)
createsOnly.subscribe(createOnlyObserver)
/*
 * The same as
 * {{{
 * val createScalaOnlyObserver = Observer[EventAtPath](onNext = { event => println(s"A Scala source file was created: $event")})
 * scalaSourceCreatesOnly.subscribe(createScalaOnlyObserver)
 * }}}
 *
 * The same as declaring an Observer separately and attaching it via #subscribe (as seen above),
 * since Observer as a type is just a way of binding 3 different functions, onNext, onCompleted, and onError
 *
 */
scalaSourceCreatesOnly.subscribe(onNext = { event => println(s"A Scala source file was created: $event")})

Now let’s make stuff happen in another terminal.

1
2
3
4
$ ~/Desktop/directory1: touch hello
$ ~/Desktop/directory1: echo lol >> hello
$ ~/Desktop/directory1: touch speedy.scala
$ ~/Desktop/directory1: echo 'println("hmm")' >> speedy.scala

The following will be outputted

1
2
3
4
5
6
7
Something was created or modified: EventAtPath(ENTRY_CREATE,/Users/lloyd/Desktop/directory1/hello)
Something was created: EventAtPath(ENTRY_CREATE,/Users/lloyd/Desktop/directory1/hello)
Something was created or modified: EventAtPath(ENTRY_MODIFY,/Users/lloyd/Desktop/directory1/hello)
Something was created or modified: EventAtPath(ENTRY_CREATE,/Users/lloyd/Desktop/directory1/speedy.scala)
Something was created: EventAtPath(ENTRY_CREATE,/Users/lloyd/Desktop/directory1/speedy.scala)
A Scala source file was created: EventAtPath(ENTRY_CREATE,/Users/lloyd/Desktop/directory1/speedy.scala)
Something was created or modified: EventAtPath(ENTRY_MODIFY,/Users/lloyd/Desktop/directory1/speedy.scala)

Lastly, since we’re done, let’s call the stop() method on the RxMonitor object so that subscribed Observers are notified and we stop the underlying MonitorActor as well. Cleaning up is A Good Thing (TM).

1
monitor.stop()

Conclusion

I hope this post has demonstrated the power of using RxScala’s Observable as an abstraction of asynchronous events into a tangible data structure, and how using it through Schwatcher might simplify the process of building your own applications. If you have any questions or spot any mistakes, please feel free to leave a comment.

Comments