In a previous post, I talked about SBT-OpenCV, a plugin for SBT that makes it easy to get started with OpenCV in any SBT-defined JVM app using just one line in project/plugins.sbt. Having handled the issue of getting the proper dependencies into a project, we can turn our attention to actually using the libraries to do something cool.

This post is the beginning of a series, where the end goal is to build a smile detector. Akka and OpenCV will be used, with Spark joining later on to complete the buzzwords treble.

A well-rounded and fun first step is to get a video feed from a webcam showing on our screen. To do this, we will cover a variety of things, including how to define a custom Akka Source, how to use JavaCV, and some basic OpenCV image manipulation utilities.

Akka Streams

Many of the OpenCV tutorials floating around on the interwebs use a procedural approach; perhaps because it better fits the programming language of the tutorial, or for performance. In this series of posts, we will instead adopt a stream processing model, specifically in the manner of Reactive Streams.

There are many benefits of using the Reactive Stream model (this blog post, and this slide deck by Roland Kuhn are great places to start reading), but the main ones I feel are relevant for us are:

  1. Simplicity: by turning data processing into a series of simple stateless transformations, your code is easy to maintain, easy to change, and easy to understand: in other words, it becomes agile (relax: your code, not your team…).

  2. Backpressure: Reactive Streams implementations ensure that backpressure (when downstream transforms take too long, upstream is informed so as to not overload your system) is handled automatically

  3. Asynchronous: Reactive Streams are run asynchronously by default, leaving your main thread(s) responsive

In Scala, Akka-Streams is the defacto implementation of the Reactive Streams spec, and although it is labelled experimental, its adoption looks imminent (for example, there is already a Play integration and the innards of Play are being rewritten to use Akka-Http, which is based on Akka-Streams). Another nice Reactive Streams implementation in Scala is Monix, which offers a (subjectively) cleaner interface that is more familiar for people who come from RxScala/RxJava.

For the purposes of this tutorial, we will be using Akka-Streams because it seems to have higher chances of wide-spread adoption.

Note that this tutorial was written based on an experimental version of Akka streams.

Flow chart

Asides from wrapping OpenCV, JavaCV comes with a number of useful classes. One such class is CanvasFrame, which is a hardware-accelerated Swing Frame implementation for showing images. CanvasFrame’s .showImage method accepts a Frame, which is the exact same type that OpenCVFrameGrabber (another useful JavaCV class) returns from its .grabh() method.

Before showing the image, we will flip the image so that the feed we see on screen moves in the direction we expect. This requires us to do a simple transformation to a Mat, a wrapper type for OpenCV’s native matrix, do the actual flipping of the matrix, convert the Mat back into a Frame, and then show it on the CanvasFrame.

In short, our pipeline looks something like this:

The Source

As the diagram suggests, the first thing we need is a Source that produces Frames; in other words, a Source[Frame].

The OpenCVFrameGrabber API for grabbing frames from a webcam is fairly simple: you instantiate one passing in an Int for the device id of the webcam (usually 0), optionally pass some settings to it, and then call start to initialise the grabber. Afterwards, it is simly a matter of calling .grab() to obtain a Frame.

1
2
3
4
5
6
7
8
9
10
val grabber = new OpenCVFrameGrabber(deviceId)
grabber.setImageWidth(imageWidth)
grabber.setImageHeight(imageHeight)
grabber.setBitsPerPixel(bitsPerPixel)
grabber.setImageMode(imageMode)
grabber.start()

//...

grabber.grab() // returns a Frame

In order to create an Akka Source[Frame], we will make use of the Akka-provided ActorPublisher class, which provides helper methods that specifically make it easy to send data only when there is downstream demand (this is how backpressure is automagically handled).

In the actor’s receive method, we match on

  • Request message type, which use to then call emitFrames()
  • A custom Continue object, which also calls emitFrames()
  • Cancel in order to know when to stop the actor.

The emitFrames() method is a method that checks to see if the Actor is currently active (whether it has any subscribers), and if it is, grabs a frame and sends it to the onNext helper method from ActorPublisher to send a piece of data. It then checks if totalDemand (another ActorPublisher method) is greater than 0, and sends itself a Continue message, which invokes emitFrames() again. This somewhat convoluted way of sending data downstream is required because grabber.grab() is a blocking call, and we don’t want to block the Actor threadpool for too long at a time (this pattern is used by the built-in InputStreamPublisher).

In order to make a Source[Frame], we instantiate an instance of our actor, pass its ActorRef to a method that creates a Publisher[Frame], and then pass the publisher to a method that makes a Source[Frame].

For the purposes of keeping our API clean, we make it a private class and expose only a static method for creating a source.

Webcam Source[Frame] (WebcamSource.scala) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.beachape.video

import akka.actor.{ DeadLetterSuppression, Props, ActorSystem, ActorLogging }
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{ Cancel, Request }
import akka.stream.scaladsl.Source
import org.bytedeco.javacpp.opencv_core._
import org.bytedeco.javacv.{ FrameGrabber, Frame }
import org.bytedeco.javacv.FrameGrabber.ImageMode

/**
 * Created by Lloyd on 2/13/16.
 */

object Webcam {

  /**
   * Builds a Frame [[Source]]
   *
   * @param deviceId device ID for the webcam
   * @param dimensions
   * @param bitsPerPixel
   * @param imageMode
   * @param system ActorSystem
   * @return a Source of [[Frame]]s
   */
  def source(
    deviceId: Int,
    dimensions: Dimensions,
    bitsPerPixel: Int = CV_8U,
    imageMode: ImageMode = ImageMode.COLOR
  )(implicit system: ActorSystem): Source[Frame, Unit] = {
    val props = Props(
      new WebcamFramePublisher(
        deviceId = deviceId,
        imageWidth = dimensions.width,
        imageHeight = dimensions.height,
        bitsPerPixel = bitsPerPixel,
        imageMode = imageMode
      )
    )
    val webcamActorRef = system.actorOf(props)
    val webcamActorPublisher = ActorPublisher[Frame](webcamActorRef)

    Source.fromPublisher(webcamActorPublisher)
  }

  // Building a started grabber seems finicky if not synchronised; there may be some freaky stuff happening somewhere.
  private def buildGrabber(
    deviceId: Int,
    imageWidth: Int,
    imageHeight: Int,
    bitsPerPixel: Int,
    imageMode: ImageMode
  ): FrameGrabber = synchronized {
    val g = FrameGrabber.createDefault(deviceId)
    g.setImageWidth(imageWidth)
    g.setImageHeight(imageHeight)
    g.setBitsPerPixel(bitsPerPixel)
    g.setImageMode(imageMode)
    g.start()
    g
  }

  /**
   * Actor that backs the Akka Stream source
   */
  private class WebcamFramePublisher(
      deviceId: Int,
      imageWidth: Int,
      imageHeight: Int,
      bitsPerPixel: Int,
      imageMode: ImageMode
  ) extends ActorPublisher[Frame] with ActorLogging {

    private implicit val ec = context.dispatcher

    // Lazy so that nothing happens until the flow begins
    private lazy val grabber = buildGrabber(
      deviceId = deviceId,
      imageWidth = imageWidth,
      imageHeight = imageHeight,
      bitsPerPixel = bitsPerPixel,
      imageMode = imageMode
    )

    def receive: Receive = {
      case _: Request => emitFrames()
      case Continue => emitFrames()
      case Cancel => onCompleteThenStop()
      case unexpectedMsg => log.warning(s"Unexpected message: $unexpectedMsg")
    }

    private def emitFrames(): Unit = {
      if (isActive && totalDemand > 0) {
        /*
          Grabbing a frame is a blocking I/O operation, so we don't send too many at once.
         */
        grabFrame().foreach(onNext)
        if (totalDemand > 0) {
          self ! Continue
        }
      }
    }

    private def grabFrame(): Option[Frame] = {
      Option(grabber.grab())
    }
  }

  private case object Continue extends DeadLetterSuppression

}

We’ll also define a simple Dimensions case class to make things a bit clearer (keyword arguments FTW)

Tuple-like class for holding dimensions (Dimensions.scala) download
1
2
3
4
/**
 * Tuple-like class for holding width and height in pixels
 */
case class Dimensions(width: Int, height: Int)

Conversion

In order to begin processing our feed with OpenCV, we first need to transform our Frame, which is a JavaCV type, into a type that works with JavaCV’s wrapping of OpenCV’s main representation of images, the matrix, aka Mat. Fortunately, JavaCV has a OpenCVFrameConverter.ToMat helper class that helps us do this. Since the class uses a mutable private field for holding on to temporary results, it normally isn’t advisable to use it in multithreaded code unless we make new copies of it each time, but we can make it thread safe by binding it to a ThreadLocal.

Media conversion utility methods (MediaConversion.scala) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * Holds conversion and transformation methods for media types
 */
object MediaConversion {

  // Each thread gets its own greyMat for safety
  private val frameToMatConverter = ThreadLocal.withInitial(new Supplier[OpenCVFrameConverter.ToMat] {
    def get(): OpenCVFrameConverter.ToMat = new OpenCVFrameConverter.ToMat
  })

  /**
   * Returns an OpenCV Mat for a given JavaCV frame
   */
  def toMat(frame: Frame): Mat = frameToMatConverter.get().convert(frame)

  /**
   * Returns a JavaCV Frame for a given OpenCV Mat
   */
  def toFrame(mat: Mat): Frame = frameToMatConverter.get().convert(mat)

}

Manipulation

Once we have our Mat, we can use OpenCV methods to do manipulation. One thing though, is that (perhaps for efficiency) by default, these methods mutate the original object. This can cause strange issues in a multi-threaded, multi-path Flow graph, so instead of using them as is, we make use of the convenient clone method before doing our flip so that the original matrix remains as-is.

(Flip.scala) download
1
2
3
4
5
6
7
8
9
10
11
12
object Flip {

  /**
   * Clones the image and returns a flipped version of the given image matrix along the y axis (horizontally)
   */
  def horizontal(mat: Mat): Mat = {
    val cloned = mat.clone()
    flip(cloned, cloned, 1)
    cloned
  }

}

Hooking things up

Now that we have all our components, all we need to do is create a simple application that instantiates all our components and hooks them all together:

  1. Instantiate our ActorSystem and Materializer
  2. Instantiate a CanvasFrame
  3. Instantiate our Source[Frame]
  4. Define our Graph by using our components to transform it
  5. Run the graph
Webcam feed app (WebcamWindow.scala) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object WebcamWindow extends App {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val canvas = new CanvasFrame("Webcam")
  //  Set Canvas frame to close on exit
  canvas.setDefaultCloseOperation(javax.swing.JFrame.EXIT_ON_CLOSE)

  val imageDimensions = Dimensions(width = 640, height = 480)
  val webcamSource = Webcam.source(deviceId = 0, dimensions = imageDimensions)

  val graph = webcamSource
    .map(MediaConversion.toMat) // most OpenCV manipulations require a Matrix
    .map(Flip.horizontal)
    .map(MediaConversion.toFrame) // convert back to a frame
    .map(canvas.showImage)
    .to(Sink.ignore)

  graph.run()

}

Looking at the code, one of the rewards of using the stream processing model over the procedureal approach might jump out at you: the near 1 to 1 correspondence that the graph definition has with our earlier diagram.

Conclusion

So, with that we should now have a very simple app shows what your webcam sees, flipped so that when you move left, the image moves with you. We’ve done it by declaring a custom Akka Stream Source and transforming it a little bit before shoving it onto the screen.

In the next post, we will look at how to do something a bit more complex: face detection using OpenCV.

Note the code for this post is on Github

Credits

  1. Playing with OpenCV in Scala to do face detection with Haarcascade classifier using a webcam

Comments