How Backpressure Works in Combine

I have recorded a video to describe how backpressure works in Combine.

Full Post + Comments

A Tour of Combine - Slide Deck

My slide deck for the presentation that I gave on Apple’s new Combine framework at the iOS Developers Perth meetup. We go over the main concepts and types, and also explore how Combine implements back pressure.

A Tour of Combine from Vy-Shane Xie

Full Post + Comments

Announcing CombineGRPC, a library that integrates Swift gRPC and Combine to enable responsive SwiftUI apps

12 September 2019

I have updated the code examples in this blog post to match version 0.6.0 of CombineGRPC.

I was pretty excited when Apple announced SwiftUI and Combine at WWDC this year. I have experimented with nested components and RxSwift in the past and I have been wanting to use something like this on Apple’s platforms.

I am a big fan of gRPC. gRPC and Protocol Buffers have been game changers for us at WiseTime. In terms of developer ergonomics, they give us a very lightweight way of defining APIs, implementing them and calling them. The built-in streaming support means that it is just as easy to implement asynchronous messaging (push) as it is to implement request/response.

I have been keeping an eye on the Swift gRPC project, and when they released their first 1.0.0-alpha version based on SwiftNIO, I decided that the world was ready for CombineGRPC, a library that integrates Swift gRPC and the Combine framework. I dreamt of beautiful, responsive UIs; of streaming data straight to my lists as the user scrolled. Then I woke up and got to work.

import CombineGRPC

Writing CombineGRPC required some experimentation. Documentation on the NIO branch of Swift gRPC wasn’t fully fleshed out yet, and there were not many resources on the Combine framework at the time. I mostly followed the types, asked dumb questions, and validated my hypotheses with test scenarios. I found and reported one bug in Swift gRPC. It was very quickly fixed upstream.

Taking CombineGRPC Out for a Spin

Here are the steps for specifying, implementing and calling a gRPC service:

  1. Write your service definition using the Protocol Buffers interface definition language
  2. Use the protoc compiler and the Swift Protobuf plugin to generate the Swift types for the messages defined in your .proto file
  3. Use the protoc compiler and the Swift gRPC plugin to generate the service protocols and Swift client that you can use to call the service
  4. Use the handle functions that are provided by CombineGRPC to implement your RPCs by making use of Combine publishers
  5. Create a GRPCExecutor and use its call methods to interact with your gRPC service

Let’s see what this looks like in practice. In the following example, we define an EchoService that simply echoes back all the requests messages that it receives. We’ll use it to demonstrate how easy it is to set up bidirectional streaming between a server and a client.

syntax = "proto3";

 * A simple bidirectional streaming RPC that takes a request stream
 * as input and echoes back all the messages in an output stream.
service EchoService {
  rpc SayItBack (stream EchoRequest) returns (stream EchoResponse);

message EchoRequest {
  string message = 1;

message EchoResponse {
  string message = 1;

To generate Swift code from the protobuf, first install the protoc plugins for Swift and Swift gRPC, and then run:

protoc echo_service.proto --swift_out=Generated/
protoc echo_service.proto --swiftgrpc_out=Generated/

Let’s Write a gRPC Server

If you are using SPM, you can add CombineGRPC to your project by listing it as a dependency in Package.swift:

dependencies: [
  .package(url: "", from: "0.6.0"),

You are now ready to implement the server-side gRPC service. To do so, implement the Swift gRPC generated protocol for the service, and use the CombineGRPC handle function. You provide it with a handler function that accepts a Combine publisher of requests AnyPublisher<EchoRequest, Error> and returns a publiser of responses AnyPublisher<EchoResponse, GRPCStatus>. Notice that the output stream may fail with a GRPCStatus error.

import Foundation
import Combine
import CombineGRPC
import GRPC
import NIO

class EchoServiceProvider: EchoProvider {
  func sayItBack(context: StreamingResponseCallContext<EchoResponse>) ->
    EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>
    handle(context) { requests in
        .map { req in
          EchoResponse.with { $0.message = req.message }
        .setFailureType(to: GRPCStatus.self)

Our implementation is simple enough. We map over the request stream and write the input messages into the output stream. CombineGRPC provides handle functions for each RPC type. There is a version for unary, server streaming, client streaming and bidirectional streaming RPCs.

To start the gRPC server, we use the Swift gRPC incantation:

let configuration = Server.Configuration(
  target: ConnectionTarget.hostAndPort("localhost", 8080),
  eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1),
  serviceProviders: [EchoServiceProvider()]
_ = try Server.start(configuration: configuration).wait()

Let it Flow: Calling our Bidirectional Streaming RPC

Now let’s setup our client. Again, it’s the same process that you would go through when using Swift gRPC.

let configuration = ClientConnection.Configuration(
  target: ConnectionTarget.hostAndPort("localhost", 8080),
  eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1)
let echoClient = EchoServiceClient(connection: ClientConnection(configuration: configuration))

To call the service, create a GRPCExecutor and use its call method. call is curried. You first configure it with the RPC that you want to call - echoClient.sayItBack. The client with the method sayItBack was generated from our protobuf definition by Swift gRPC.

The bidirectional streaming version of the call function then takes as parameter a stream of requests AnyPublisher<Request, Error> and returns a stream AnyPublisher<Response, GRPCStatus> of responses from the server. Let’s verify that our server does what it’s supposed to do:

let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 10)
let requestStream: AnyPublisher<EchoRequest, Error> =
  Publishers.Sequence(sequence: requests).eraseToAnyPublisher()
let grpc = GRPCExecutor()
  .filter { $0.message == "hello" }
  .sink(receiveValue: { count in
    assert(count == 10)

That’s it! You have set up bidirectional streaming between a gRPC server and client.

The Types of CombineGRPC

CombineGRPC provides versions of call and handle for all four RPC styles. call and handle are symmetrical. What you provide to call is given to your handler via handle, and what you output from your handler is what call will return when you call your RPC. Therefore, everything that you need to know about CombineGRPC is in the following table.

RPC Style Input and Output Types
Unary Request -> AnyPublisher<Response, GRPCStatus>
Server streaming Request -> AnyPublisher<Response, GRPCStatus>
Client streaming AnyPublisher<Request, Error> -> AnyPublisher<Response, GRPCStatus>
Bidirectional streaming AnyPublisher<Request, Error> -> AnyPublisher<Response, GRPCStatus>

When you make a unary call, you provide a request message, and get back a response publisher. The response publisher will either publish a single response, or fail with a GRPCStatus error. Similarly, if you are handling a unary RPC call, you provide a handler that takes a request parameter and returns an AnyPublisher<Response, GRPCStatus>.

You can follow the same intuition to understand the types for the other RPC styles. The only difference is that publishers for the streaming RPCs may publish zero or more messages instead of the single response message that is expected from the unary response publisher.

(flat)Map All the Things?

I’m sold, should I use CombineGRPC in my app? Not yet. The Combine framework is still in beta. The NIO version of Swift gRPC is still in alpha. All the operating systems that support Combine are currently in beta. I consider CombineGRPC to be in preview stage. It’s reached the point where its API is fleshed out enough that I feel comfortable soliciting feedback without wasting people’s time.

So, do let me know if you like the direction, have any questions or have suggestions.

The repository is hosted on GitHub at

Full Post + Comments

How to Set a Static IP Address in Ubuntu 18.04 Bionic Beaver

Make your networking changes in /etc/cloud/cloud.cfg.d/50-curtin-networking.cfg. For example:

  version: 2
  renderer: networkd
     dhcp4: no
     addresses: []
       addresses: [,,]


sudo cloud-init clean
sudo cloud-init init
sudo netplan apply

If you have multiple network cards and only one of them is connected, you can set the unused card as optional so that the system doesn’t wait for a long time for it to come up at boot. For example:

  version: 2
  renderer: networkd
     dhcp4: yes
     optional: true
     dhcp4: no
     addresses: []
       addresses: [,,]

Full Post + Comments

Overengineering Circa 2019

alt text

Full Post + Comments

FoundationDB Integration Tests in Scala

When Apple announced the FoundationDB Record Layer, I was keen to take it for a test drive.

The first order of business is to figure out a way to write integration tests. The docker-it-scala project gives us an easy way to launch Docker containers for testing, and the FoundationDB project provides an official Docker image.

Unfortunately, we can’t use the FoundationDB Docker image out of the box as it doesn’t come with a pre-configured database. To get around this, we can implement a custom ready checker that gives us the ability to run arbitrary fdbcli commands on startup.

Here’s our implementation of the FoundationDB Docker test kit. We’re using the Spotify Docker client.

import com.spotify.docker.client.messages.PortBinding
import com.whisk.docker.testkit.ContainerState.{HasId, Ready}
import com.whisk.docker.testkit.{
import com.whisk.docker.testkit.scalatest.DockerTestKitForAll
import org.scalatest.Suite
import scala.concurrent.{ExecutionContext, Future}

 * Provides a FoundationDB Docker container for integration tests
trait FoundationDbDockerTestKit extends DockerTestKitForAll {
  self: Suite =>

  val fdb = FDBDatabaseFactory
    // Assumes a src/test/resources/fdb.cluster file with the following contents:
    // docker:docker@

  private val fdbPort = 4500

  private lazy val fdbContainer = ContainerSpec("foundationdb/foundationdb:latest")
    .withPortBindings(fdbPort -> PortBinding.of("", fdbPort))
    .withEnv("FDB_NETWORKING_MODE=host", s"FDB_PORT=$fdbPort")
    // The FoundationDB Docker image doesn't come with a pre-configured database
    .withReadyChecker(new FdbDockerReadyChecker("configure new single memory"))

  override val managedContainers: ManagedContainers = fdbContainer.toManagedContainer

We use a custom FdbDockerReadyChecker that runs fdbcli configure new single memory once FoundationDB is up and running. This creates a new database that uses the memory storage engine and single (no replication) mode. This is fine for integration tests.

Our ready checker looks like this:

 * Ready checker for FoundationDB container, with the ability to run a fdbcli --exec command
 * once FoundationDB has started.
class FdbDockerReadyChecker(onReadyFdbcliExec: String = "status") extends DockerReadyChecker {

  override def apply(container: BaseContainer)(implicit docker: ContainerCommandExecutor,
                                               ec: ExecutionContext): Future[Unit] = {
    val execOnReady: (String) => Future[Unit] = (containerId) => {
      Future {
          .execCreate(containerId, Array("/usr/bin/fdbcli", "--exec", onReadyFdbcliExec))
      } map { exec =>
      } map (_ => ())

    container.state() match {
      case Ready(info) =>
      case state: HasId =>
          .withLogStreamLinesRequirement(, withErr = true)(
            _.contains("FDBD joined cluster.")
          .flatMap(_ => execOnReady(
      case _ =>
          new FailFastCheckException("Can't initialise LogStream to container without ID")

To use the above, you’ll need a fdb.cluster file with the following contents in your test resources directory:


Finally, here’s what an integration test might look like:

import org.scalatest.{AsyncWordSpec, Matchers}

 * Integration tests for a SampleRepository
 * The tests in this spec need a Docker engine to run FoundationDB
class SampleRepositorySpec extends AsyncWordSpec with Matchers with FoundationDbDockerTestKit {

  val repository = new SampleRepository(fdb)

  "SampleRepository" when {
    "asked to get a record that doesn't exist" should {
      "return empty option" in {
        repository.get("inexistantrecord").runAsync map { r =>
          r shouldEqual None

Full Post + Comments

Here & Now

Here & Now is a weather app that I made to experiment with an iOS app architecture based on nestable components and RxSwift.

View controllers are deliberately minimal. They wire up their root ViewComponent and add its subview. Nested ViewComponents are used to break the UI code down into smaller logical pieces.

A ViewComponent is defined as:

protocol ViewComponent {
  // Streams used by the component
  associatedtype Inputs
  // Streams produced by the component
  associatedtype Outputs
  // Root view of the component, add as subview of parent component
  var view: UIView { get }
  init(disposedBy: DisposeBag)
  // Subscribe to input streams, export streams produced by component
  func start(_ inputs: Inputs) -> Outputs

extension ViewComponent {
  // Stop any services started by the component
  func stop() {}

Core UI logic is written in component protocol extensions for ease of testing. UI state changes are implemented as pure functions that operate on Rx types like Observable. For example, the map style is a function of the current time at the map location. A light style is used when it’s day time at the map location, and a dark style when it’s night time.

extension MapComponent {

  func mapStyle(forCameraPosition: Observable<GMSCameraPosition>,
                date: Observable<Date>) -> Observable<MapStyle> {
    let location =
    return uiScheme(forLocation: location, date: date)
      .map { $ }

  func uiScheme(forLocation: Observable<CLLocation>,
                date: Observable<Date>) -> Observable<UIScheme> {
    return Observable
      .combineLatest(forLocation, date) { (l, d) in
        if let dayTime = isDaytime(date: d, coordinate: l.coordinate) {
          return dayTime ? .light : .dark
        return .light

  // ...

The source code for Here & Now is available on GitHub.

Full Post + Comments

Multistage Docker Builds for Scala Applications

The following Dockerfile gives an example of a multistage build that runs sbt in a builder container. This means that users don’t need to install Scala tooling on their machines in order to build the project. To optimise build times, I cache dependencies first by running sbt update in a precursor step to sbt stage.

FROM hseeberger/scala-sbt:8u181_2.12.7_1.2.6 as builder
WORKDIR /build
# Cache dependencies first
COPY project project
COPY build.sbt .
RUN sbt update
# Then build
COPY . .
RUN sbt stage
# Download Geonames file
RUN wget
RUN unzip

FROM openjdk:8u181-jre-slim
COPY --from=builder /build/target/universal/stage/. .
COPY --from=builder /build/cities500.txt .
ENV PLACES_FILE_PATH=/app/cities500.txt
RUN mv bin/$(ls bin | grep -v .bat) bin/start
CMD ["./bin/start"]

The final image only has the JRE, and no build tools.

Full Post + Comments

A Reverse Geocoding gRPC Service Written in Scala

I made a reverse geocoder gRPC server as a demo of how one might structure a backend service in Scala. I structured the application to have a purely functional core, with an imperative shell.

However there’s a twist to the plot. I’m mixing classical OOP with pure FP. I wanted to see what the code looked like if I used a dependency injection framework (Airframe) to wire up the side effects at the outer edges.

The main method is where we build the object graph:

object Main extends App with LazyLogging {

  override def main(args: Array[String]): Unit = {
    val config = loadConfigOrThrow[Config]

    // Wire up dependencies

      // Load places from disk immediately upon startup
      .bind[KDTreeMap[Location, Place]].toEagerSingletonProvider(loadPlacesBlocking)

      // Startup

    // Side effects are injected at the edge:

    lazy val fileReader: LinesFileReader = () => {"Loading places from ${config.placesFilePath}")
      val reader = new BufferedReader(
        new InputStreamReader(new FileInputStream(config.placesFilePath), "UTF-8")

    lazy val loadPlacesBlocking: PlacesLoader => KDTreeMap[Location, Place] = { loader =>
      Await.result(loader.load().runAsync, 1 minute)

    lazy val clock: Clock = {
        .interval(1 second)
        .map(_ =>

Side effects are:

  • Reading from the file system
  • The clock

fileReader gives us a stream of lines from the file, and the clock is a stream of Instants. Both are modelled using the Monix Observable type.

The Application trait is still very much imperative. We set up application status, served via Healthttpd, then start the gRPC server.

trait Application extends LazyLogging {
  private val config = bind[Config]
  private val healthttpd = bind[Healthttpd]
  private val reverseGeocoderService = bind[ReverseGeocoderService]

  def run(): Unit = {
    healthttpd.startAndIndicateNotReady()"Starting gRPC server")

    val grpcServer = NettyServerBuilder

    sys.ShutdownHookThread {


The core of the application, concerned with serving requests, is pure, and easily tested. I’m using Task as an IO monad.

class ReverseGeocodeLocationRpc(places: KDTreeMap[Location, Place], clock: Clock) {

  def handle(request: ReverseGeocodeLocationRequest): Task[ReverseGeocodeLocationResponse] = {
    findNearest(request.latitude, request.longitude)(places)
      .map(addSunTimes(_, clock).map(toResponse))

  private def findNearest(latitude: Latitude, longitude: Longitude)(places: KDTreeMap[Location, Place]): Option[Place] = {
      .findNearest((latitude, longitude), 1)

  private case class Sun(rise: Option[Timestamp], set: Option[Timestamp])

  private def addSunTimes(place: Task[Place], clock: Clock): Task[Place] = {
    Task.zip2(place, clock.firstL).map {
      case (p, t) =>
        val zonedDateTime = t.atZone(ZoneId.of(p.timezone))
        val sun = calculateSun(p.latitude, p.longitude, p.elevationMeters, zonedDateTime)
        p.copy(sunriseToday = sun.rise, sunsetToday = sun.set)

  private def calculateSun(latitude: Latitude,
                           longitude: Longitude,
                           altitudeMeters: Int,
                           zonedDateTime: ZonedDateTime): Sun = {
    val solarTime = SolarTime.ofLocation(latitude, longitude, altitudeMeters, StdSolarCalculator.TIME4J)
    val calendarDate = PlainDate.from(zonedDateTime.toLocalDate)
    def toTimestamp(moment: Moment) = Timestamp(moment.getPosixTime, moment.getNanosecond())
    val rise = solarTime.sunrise().apply(calendarDate)
    val set = solarTime.sunset().apply(calendarDate)
    Sun(rise, set)

  private def toResponse(place: Place): ReverseGeocodeLocationResponse = {

  private val emptyTaskResponse =

This was a pragmatic approach to putting togetger a Scala backend application. I picked a toy service to experiment with DI in the context of FP. I think that the result wasn’t too gnarly.

The source code is available on GitHub: reverse-geocoder.

Full Post + Comments

Setting Up a Kubernetes Cluster on Ubuntu 16.04 via kubeadm

I have just redone the software stack on my homelab cluster from scratch. I am still using Ubuntu 16.04 since the Docker versions that are currently available for 18.04 are not yet supported by Kubernetes.

These are the lab notes that I compiled while installing Kubernetes v1.10.3 via kubeadm. I chose Calico for pod networking.

All Nodes

In this section we’ll prepare the master and worker nodes for Kubernetes. We’ll start from a newly minted Ubuntu 16.04 on each node:

Install Docker 17.03

apt-get update
apt-get install -y apt-transport-https ca-certificates curl software-properties-common
curl -fsSL | apt-key add -
add-apt-repository "deb$(. /etc/os-release; echo "$ID") $(lsb_release -cs) stable"
apt-get update && apt-get install -y docker-ce=$(apt-cache madison docker-ce | grep 17.03 | head -1 | awk '{print $3}')

Install kubeadm, kubelet and kubectl

apt-get update && apt-get install -y apt-transport-https curl
curl -s | apt-key add -
cat <<EOF >/etc/apt/sources.list.d/kubernetes.list
deb kubernetes-xenial main
apt-get update
apt-get install -y kubelet kubeadm kubectl

Turn swap off:

swapoff -a

Master Node

Configure cgroup driver used by kubelet on Master Node

Make sure that the cgroup driver used by kubelet is the same as the one used by Docker. To check whether the Docker cgroup driver matches the kubelet config:

docker info | grep -i cgroup
cat /etc/systemd/system/kubelet.service.d/10-kubeadm.conf

If the Docker cgroup driver and the kubelet config don’t match, update the latter. The flag we need to change is –cgroup-driver. If it’s already set, we can update the configuration like so:

sed -i "s/cgroup-driver=systemd/cgroup-driver=cgroupfs/g" /etc/systemd/system/kubelet.service.d/10-kubeadm.conf

Otherwise, open the systemd file and add the flag to an existing environment line. Then restart the kubelet:

systemctl daemon-reload
systemctl restart kubelet

Initialise Kubernetes Master

Initialise the master node by running kubeadm init. We need to specify the pod network CIDR for network policy to work correctly when we install Calico in a later step.

kubeadm init --pod-network-cidr=

To be able to use kubectl as non-root user on master:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

Install Calico for networking:

kubectl apply -f
kubectl apply -f

Once Calico has been installed, confirm that it is working by checking that the kube-dns pod is running before joining the worker nodes.

shane@master1:~$ kubectl get pods --all-namespaces
NAMESPACE     NAME                              READY     STATUS    RESTARTS   AGE
kube-system   calico-node-2zrrz                 2/2       Running   0          8m
kube-system   etcd-master1                      1/1       Running   0          11m
kube-system   kube-apiserver-master1            1/1       Running   0          11m
kube-system   kube-controller-manager-master1   1/1       Running   0          11m
kube-system   kube-dns-86f4d74b45-bpsgs         3/3       Running   0          12m
kube-system   kube-proxy-pkfjx                  1/1       Running   0          12m
kube-system   kube-scheduler-master1            1/1       Running   0          11m

Worker Nodes

Next we’ll join the worker nodes to our new Kubernetes cluster. Run the command that was output by kubeadm init on each of the nodes:

kubeadm join --token <token> <master-ip>:<master-port> --discovery-token-ca-cert-hash sha256:<hash>

We should see nodes joining the cluster shortly:

shane@master1:~$ kubectl get nodes
master1   Ready      master    22m       v1.10.3
minion1   Ready      <none>    6m        v1.10.3
minion2   Ready      <none>    4m        v1.10.3
minion3   NotReady   <none>    4m        v1.10.3
minion4   NotReady   <none>    4m        v1.10.3

Configure Access from Workstation

To control the cluster remotely from our workstation, we grab the contents of /etc/kubernetes/admin.conf from the master node and merge it into our local ~/.kube/config configuration file.

Full Post + Comments