The WatchService was added as part of Java 7 and introduced the ability to monitor files through the JVM without the use of external libraries like JNotify that require installing native libraries. Using this API for a project that requires monitoring files makes handling dependencies for both deployment and development much simpler.

Since Scala is able to directly invoke Java, I wanted to use this API when I was building Akanori-thrift, a trending-words detection service that is focused on the Japanese language. This post will not go over that service in detail (that will take up an entire post of its own if not more) but my use-case there was monitor a custom dictionary file for updates and then spawn a new instance of the Tokenizer that uses the updated state of the file.

I quickly realised a few pain-points:

  1. There existed no file monitoring Scala library (at the time),
  2. Using the WatchService API requires the use of a blocking thread to get events,
  3. The WatchService API does not have recursive monitoring support built in

To address these, I set out to create Schwatcher, a Scala library that wraps the WatchService API of Java7 and allows callbacks to be registered and unregistered on both directories and files both as individual paths and recursively. Furthermore, I wanted to facilitate the use of the Java7 API in Scala in a simple way that is in line with the functional programming paradigm.

Components

The main components of the Schwatcher library include:

  1. Akka actors: While I was building Akanori-thrift, I already knew that I wanted to use Akka actors as an abstraction of concurrency for their resilience and concurrency control tools (Agents in particular).
  2. Threads: After reading this awesome blog post on how to use Akka actors with Watchservice by encapsulating the blocking loop in a thread via Runnable, I knew I wanted to use this pattern for Schwatcher.
  3. CallbackRegistry: A callback registry that maps paths to a list of callback functions that get called when the Java 7 service signals that an event has occured on a specific path.

Basic workflow (a.k.a. how to use)

This post won’t go over in too much detail how to use the library because that stuff is available from the Schwatcher Github page and will probably change over time, but this is the basic workflow:

  1. Instantiate a MonitorActor
  2. Register callbacks by sending RegisterCallback messages to the MonitorActor and passing in a path with an event type.
  3. Carry on, as your callbacks will be called when events happen

The project is Mavenised and is availale from the Central Repository, so simply add the libraryDependency in your build.sbt (when you read this the versioning might be different so refer to the project’s Github page):

1
libraryDependencies += "com.beachape.filemanagement" %% "schwatcher" % "0.0.2"

And to use it,

Schwatcher example code (schwatcher_example.scala) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import akka.actor.ActorSystem
import com.beachape.filemanagement.MonitorActor
import com.beachape.filemanagement.RegistryTypes._
import com.beachape.filemanagement.Messages._

import java.io.{FileWriter, BufferedWriter}

import java.nio.file.Paths
import java.nio.file.StandardWatchEventKinds._

implicit val system = ActorSystem("actorSystem")
val fileMonitorActor = system.actorOf(MonitorActor(concurrency = 2))

val modifyCallbackFile: Callback = {
  path => println(s"Something was modified in a file: $path")
}
val modifyCallbackDirectory: Callback = {
  path => println(s"Something was modified in a directory: $path")
}

val desktop = Paths get "/Users/lloyd/Desktop"
val desktopFile = Paths get "/Users/lloyd/Desktop/test"

/*
  This will receive callbacks for just the one file
 */
fileMonitorActor ! RegisterCallback(
  ENTRY_MODIFY,
  recursive = false,
  path = desktopFile,
  modifyCallbackFile)

/*
  If desktopFile is modified, this will also receive a callback
  it will receive callbacks for everything under the desktop directory
*/
fileMonitorActor ! RegisterCallback(
  ENTRY_MODIFY,
  recursive = false,
  path = desktop,
  modifyCallbackDirectory)


//modify a monitored file
val writer = new BufferedWriter(new FileWriter(desktopFile.toFile))
writer.write("Theres text in here wee!!")
writer.close()

// #=> Something was modified in a file: /Users/a13075/Desktop/test.txt
//     Something was modified in a directory: /Users/a13075/Desktop/test.txt

Behind the scenes workflow

A few things happen behind the scenes (accurate at time of writing):

  1. Upon the MonitorActor ‘s instantiation, a private WatchServiceTask Runnable object is created and its accompanying WatchService thread (the blocking thread that takes events from the Java 7 WatchService) is started. The MonitorActor is in charge of keeping tabs (starting and stopping) the WatchService thread.
  2. The MonitorActor also instantiates a Map of type [EventType, Agent[CallbackRegistry]]. CallbackRegistry objects are themselves Maps of type [Path, List[Path => Unit]] and are immutable. The callbacks are put inside an Agent to assure atomic concurrent updates.
  3. When registering a Path and file system event type with a Callback function, a RegisterCallback message is sent to the MonitorActor and the MonitorActor sends an update message on the Agent containing the CallbackRegistry for that event type. Adding callback functions or paths to a CallbackRegistry creates a new one containing the (new) path and its new accompanying List[Callback] while leaving the old one untouched. Un-registering a path’s callbacks work the same way (but in reverse).
  4. If recursive is set to true in the RegisterCallback or UnRegisterCallback messages, then the path’s tree is walked and each directory is registered with callbacks as long as the initial path given is that of a directory.
  5. When an event is picked up from the Java 7 WatchService within the WatchService thread (mentioned in 1), a EventAtPath message is sent from that thread its parent MonitorActor, containing the event type and the path of the event.
  6. The MonitorActor receives the EventAtPath message and looks up the proper list of callbacks for the event type and path and sends each callback packaged inside PerformCallback messages to it’s pool of CallbackActors via a SmallestMailbox router. Thus, callbacks are handled concurrently (or, if desired, one at a time by sending MonitorActor’s concurrency parameter to 1 when instantiating it)

Conclusion

Hopefully, Schwatcher is useful for Scala developers looking to monitor the file system. Questions, pull requests, feedback are greatly appreciated !

Publishing to Maven

As a side-note, when publishing this library to Maven via Sonatype, I found the following links very helpful:

  1. Offical Scala-sbt’s guide to Deploying to Sonatype
  2. Sonatype OSS Maven Repository usage guide

Comments