How Backpressure Works in Combine
I have recorded a video to describe how backpressure works in Combine.
I have recorded a video to describe how backpressure works in Combine.
My slide deck for the presentation that I gave on Apple’s new Combine framework at the iOS Developers Perth meetup. We go over the main concepts and types, and also explore how Combine implements back pressure.
I have updated the code examples in this blog post to match version 0.6.0 of CombineGRPC.
I was pretty excited when Apple announced SwiftUI and Combine at WWDC this year. I have experimented with nested components and RxSwift in the past and I have been wanting to use something like this on Apple’s platforms.
I am a big fan of gRPC. gRPC and Protocol Buffers have been game changers for us at WiseTime. In terms of developer ergonomics, they give us a very lightweight way of defining APIs, implementing them and calling them. The built-in streaming support means that it is just as easy to implement asynchronous messaging (push) as it is to implement request/response.
I have been keeping an eye on the Swift gRPC project, and when they released their first 1.0.0-alpha version based on SwiftNIO, I decided that the world was ready for CombineGRPC, a library that integrates Swift gRPC and the Combine framework. I dreamt of beautiful, responsive UIs; of streaming data straight to my lists as the user scrolled. Then I woke up and got to work.
import CombineGRPC
Writing CombineGRPC required some experimentation. Documentation on the NIO branch of Swift gRPC wasn’t fully fleshed out yet, and there were not many resources on the Combine framework at the time. I mostly followed the types, asked dumb questions, and validated my hypotheses with test scenarios. I found and reported one bug in Swift gRPC. It was very quickly fixed upstream.
Here are the steps for specifying, implementing and calling a gRPC service:
handle
functions that are provided by CombineGRPC to implement your RPCs by making use of Combine publishersGRPCExecutor
and use its call
methods to interact with your gRPC serviceLet’s see what this looks like in practice. In the following example, we define an EchoService
that simply echoes back all the requests messages that it receives. We’ll use it to demonstrate how easy it is to set up bidirectional streaming between a server and a client.
syntax = "proto3";
/*
* A simple bidirectional streaming RPC that takes a request stream
* as input and echoes back all the messages in an output stream.
*/
service EchoService {
rpc SayItBack (stream EchoRequest) returns (stream EchoResponse);
}
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
To generate Swift code from the protobuf, first install the protoc plugins for Swift and Swift gRPC, and then run:
protoc echo_service.proto --swift_out=Generated/
protoc echo_service.proto --swiftgrpc_out=Generated/
If you are using SPM, you can add CombineGRPC to your project by listing it as a dependency in Package.swift:
dependencies: [
.package(url: "https://github.com/vyshane/grpc-swift-combine.git", from: "0.6.0"),
],
You are now ready to implement the server-side gRPC service. To do so, implement the Swift gRPC generated protocol for the service, and use the CombineGRPC handle
function. You provide it with a handler function that accepts a Combine publisher of requests AnyPublisher<EchoRequest, Error>
and returns a publiser of responses AnyPublisher<EchoResponse, GRPCStatus>
. Notice that the output stream may fail with a GRPCStatus
error.
import Foundation
import Combine
import CombineGRPC
import GRPC
import NIO
class EchoServiceProvider: EchoProvider {
func sayItBack(context: StreamingResponseCallContext<EchoResponse>) ->
EventLoopFuture<(StreamEvent<EchoRequest>) -> Void>
{
handle(context) { requests in
requests
.map { req in
EchoResponse.with { $0.message = req.message }
}
.setFailureType(to: GRPCStatus.self)
.eraseToAnyPublisher()
}
}
}
Our implementation is simple enough. We map over the request stream and write the input messages into the output stream. CombineGRPC provides handle
functions for each RPC type. There is a version for unary, server streaming, client streaming and bidirectional streaming RPCs.
To start the gRPC server, we use the Swift gRPC incantation:
let configuration = Server.Configuration(
target: ConnectionTarget.hostAndPort("localhost", 8080),
eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1),
serviceProviders: [EchoServiceProvider()]
)
_ = try Server.start(configuration: configuration).wait()
Now let’s setup our client. Again, it’s the same process that you would go through when using Swift gRPC.
let configuration = ClientConnection.Configuration(
target: ConnectionTarget.hostAndPort("localhost", 8080),
eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1)
)
let echoClient = EchoServiceClient(connection: ClientConnection(configuration: configuration))
To call the service, create a GRPCExecutor
and use its call
method. call
is curried. You first configure it with the RPC that you want to call - echoClient.sayItBack
. The client with the method sayItBack
was generated from our protobuf definition by Swift gRPC.
The bidirectional streaming version of the call
function then takes as parameter a stream of requests AnyPublisher<Request, Error>
and returns a stream AnyPublisher<Response, GRPCStatus>
of responses from the server. Let’s verify that our server does what it’s supposed to do:
let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 10)
let requestStream: AnyPublisher<EchoRequest, Error> =
Publishers.Sequence(sequence: requests).eraseToAnyPublisher()
let grpc = GRPCExecutor()
grpc.call(echoClient.sayItBack)(requestStream)
.filter { $0.message == "hello" }
.count()
.sink(receiveValue: { count in
assert(count == 10)
})
That’s it! You have set up bidirectional streaming between a gRPC server and client.
CombineGRPC provides versions of call
and handle
for all four RPC styles. call
and handle
are symmetrical. What you provide to call
is given to your handler via handle
, and what you output from your handler is what call
will return when you call your RPC. Therefore, everything that you need to know about CombineGRPC is in the following table.
RPC Style | Input and Output Types |
---|---|
Unary | Request -> AnyPublisher<Response, GRPCStatus> |
Server streaming | Request -> AnyPublisher<Response, GRPCStatus> |
Client streaming | AnyPublisher<Request, Error> -> AnyPublisher<Response, GRPCStatus> |
Bidirectional streaming | AnyPublisher<Request, Error> -> AnyPublisher<Response, GRPCStatus> |
When you make a unary call, you provide a request message, and get back a response publisher. The response publisher will either publish a single response, or fail with a GRPCStatus
error. Similarly, if you are handling a unary RPC call, you provide a handler that takes a request parameter and returns an AnyPublisher<Response, GRPCStatus>
.
You can follow the same intuition to understand the types for the other RPC styles. The only difference is that publishers for the streaming RPCs may publish zero or more messages instead of the single response message that is expected from the unary response publisher.
I’m sold, should I use CombineGRPC in my app? Not yet. The Combine framework is still in beta. The NIO version of Swift gRPC is still in alpha. All the operating systems that support Combine are currently in beta. I consider CombineGRPC to be in preview stage. It’s reached the point where its API is fleshed out enough that I feel comfortable soliciting feedback without wasting people’s time.
So, do let me know if you like the direction, have any questions or have suggestions.
The repository is hosted on GitHub at https://github.com/vyshane/grpc-swift-combine.
Make your networking changes in /etc/cloud/cloud.cfg.d/50-curtin-networking.cfg
. For example:
network:
version: 2
renderer: networkd
ethernets:
enp2s0:
dhcp4: no
addresses: [192.168.1.100/24]
gateway4: 192.168.1.1
nameservers:
addresses: [192.168.1.1,1.1.1.1,8.8.8.8]
Then:
sudo cloud-init clean
sudo cloud-init init
sudo netplan apply
If you have multiple network cards and only one of them is connected, you can set the unused card as optional so that the system doesn’t wait for a long time for it to come up at boot. For example:
network:
version: 2
renderer: networkd
ethernets:
enp1s0:
dhcp4: yes
optional: true
enp2s0:
dhcp4: no
addresses: [192.168.1.100/24]
gateway4: 192.168.1.1
nameservers:
addresses: [192.168.1.1,1.1.1.1,8.8.8.8]
When Apple announced the FoundationDB Record Layer, I was keen to take it for a test drive.
The first order of business is to figure out a way to write integration tests. The docker-it-scala project gives us an easy way to launch Docker containers for testing, and the FoundationDB project provides an official Docker image.
Unfortunately, we can’t use the FoundationDB Docker image out of the box as it doesn’t come with a pre-configured database. To get around this, we can implement a custom ready checker that gives us the ability to run arbitrary fdbcli commands on startup.
Here’s our implementation of the FoundationDB Docker test kit. We’re using the Spotify Docker client.
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseFactory
import com.spotify.docker.client.messages.PortBinding
import com.whisk.docker.testkit.ContainerState.{HasId, Ready}
import com.whisk.docker.testkit.{
BaseContainer,
ContainerCommandExecutor,
ContainerSpec,
DockerReadyChecker,
FailFastCheckException,
ManagedContainers
}
import com.whisk.docker.testkit.scalatest.DockerTestKitForAll
import org.scalatest.Suite
import scala.concurrent.{ExecutionContext, Future}
/*
* Provides a FoundationDB Docker container for integration tests
*/
trait FoundationDbDockerTestKit extends DockerTestKitForAll {
self: Suite =>
val fdb = FDBDatabaseFactory
.instance()
// Assumes a src/test/resources/fdb.cluster file with the following contents:
// docker:docker@127.0.0.1:4500
.getDatabase(getClass.getResource("/fdb.cluster").getPath)
private val fdbPort = 4500
private lazy val fdbContainer = ContainerSpec("foundationdb/foundationdb:latest")
.withPortBindings(fdbPort -> PortBinding.of("0.0.0.0", fdbPort))
.withEnv("FDB_NETWORKING_MODE=host", s"FDB_PORT=$fdbPort")
// The FoundationDB Docker image doesn't come with a pre-configured database
.withReadyChecker(new FdbDockerReadyChecker("configure new single memory"))
override val managedContainers: ManagedContainers = fdbContainer.toManagedContainer
}
We use a custom FdbDockerReadyChecker
that runs fdbcli configure new single memory
once FoundationDB is up and running. This creates a new database that uses the memory storage engine and single (no replication) mode. This is fine for integration tests.
Our ready checker looks like this:
/*
* Ready checker for FoundationDB container, with the ability to run a fdbcli --exec command
* once FoundationDB has started.
*/
class FdbDockerReadyChecker(onReadyFdbcliExec: String = "status") extends DockerReadyChecker {
override def apply(container: BaseContainer)(implicit docker: ContainerCommandExecutor,
ec: ExecutionContext): Future[Unit] = {
val execOnReady: (String) => Future[Unit] = (containerId) => {
Future {
docker.client
.execCreate(containerId, Array("/usr/bin/fdbcli", "--exec", onReadyFdbcliExec))
} map { exec =>
docker.client.execStart(exec.id()).readFully()
} map (_ => ())
}
container.state() match {
case Ready(info) =>
execOnReady(info.id())
case state: HasId =>
docker
.withLogStreamLinesRequirement(state.id, withErr = true)(
_.contains("FDBD joined cluster.")
)
.flatMap(_ => execOnReady(state.id))
case _ =>
Future.failed(
new FailFastCheckException("Can't initialise LogStream to container without ID")
)
}
}
}
To use the above, you’ll need a fdb.cluster file with the following contents in your test resources directory:
docker:docker@127.0.0.1:4500
Finally, here’s what an integration test might look like:
import org.scalatest.{AsyncWordSpec, Matchers}
import monix.execution.Scheduler.Implicits.global
/*
* Integration tests for a SampleRepository
* The tests in this spec need a Docker engine to run FoundationDB
*/
class SampleRepositorySpec extends AsyncWordSpec with Matchers with FoundationDbDockerTestKit {
val repository = new SampleRepository(fdb)
"SampleRepository" when {
"asked to get a record that doesn't exist" should {
"return empty option" in {
repository.get("inexistantrecord").runAsync map { r =>
r shouldEqual None
}
}
}
}
}
Here & Now is a weather app that I made to experiment with an iOS app architecture based on nestable components and RxSwift.
View controllers are deliberately minimal. They wire up their root ViewComponent
and add its subview. Nested ViewComponents are used to break the UI code down into smaller logical pieces.
A ViewComponent
is defined as:
protocol ViewComponent {
// Streams used by the component
associatedtype Inputs
// Streams produced by the component
associatedtype Outputs
// Root view of the component, add as subview of parent component
var view: UIView { get }
init(disposedBy: DisposeBag)
// Subscribe to input streams, export streams produced by component
func start(_ inputs: Inputs) -> Outputs
}
extension ViewComponent {
// Stop any services started by the component
func stop() {}
}
Core UI logic is written in component protocol extensions for ease of testing. UI state changes are implemented as pure functions that operate on Rx types like Observable
. For example, the map style is a function of the current time at the map location. A light style is used when it’s day time at the map location, and a dark style when it’s night time.
extension MapComponent {
func mapStyle(forCameraPosition: Observable<GMSCameraPosition>,
date: Observable<Date>) -> Observable<MapStyle> {
let location = forCameraPosition.map(toLocation)
return uiScheme(forLocation: location, date: date)
.map { $0.style().mapStyle }
}
func uiScheme(forLocation: Observable<CLLocation>,
date: Observable<Date>) -> Observable<UIScheme> {
return Observable
.combineLatest(forLocation, date) { (l, d) in
if let dayTime = isDaytime(date: d, coordinate: l.coordinate) {
return dayTime ? .light : .dark
}
return .light
}
}
// ...
}
The source code for Here & Now is available on GitHub.
The following Dockerfile gives an example of a multistage build that runs sbt in a builder container. This means that users don’t need to install Scala tooling on their machines in order to build the project. To optimise build times, I cache dependencies first by running sbt update
in a precursor step to sbt stage
.
FROM hseeberger/scala-sbt:8u181_2.12.7_1.2.6 as builder
WORKDIR /build
# Cache dependencies first
COPY project project
COPY build.sbt .
RUN sbt update
# Then build
COPY . .
RUN sbt stage
# Download Geonames file
RUN wget http://download.geonames.org/export/dump/cities500.zip
RUN unzip cities500.zip
FROM openjdk:8u181-jre-slim
WORKDIR /app
COPY --from=builder /build/target/universal/stage/. .
COPY --from=builder /build/cities500.txt .
ENV PLACES_FILE_PATH=/app/cities500.txt
RUN mv bin/$(ls bin | grep -v .bat) bin/start
CMD ["./bin/start"]
The final image only has the JRE, and no build tools.
I made a reverse geocoder gRPC server as a demo of how one might structure a backend service in Scala. I structured the application to have a purely functional core, with an imperative shell.
However there’s a twist to the plot. I’m mixing classical OOP with pure FP. I wanted to see what the code looked like if I used a dependency injection framework (Airframe) to wire up the side effects at the outer edges.
The main method is where we build the object graph:
object Main extends App with LazyLogging {
override def main(args: Array[String]): Unit = {
val config = loadConfigOrThrow[Config]
// Wire up dependencies
newDesign
.bind[Config].toInstance(config)
.bind[Clock].toInstance(clock)
.bind[Healthttpd].toInstance(Healthttpd(config.statusPort))
.bind[LinesFileReader].toInstance(fileReader)
// Load places from disk immediately upon startup
.bind[KDTreeMap[Location, Place]].toEagerSingletonProvider(loadPlacesBlocking)
// Startup
.withProductionMode
.noLifeCycleLogging
.withSession(_.build[Application].run())
// Side effects are injected at the edge:
lazy val fileReader: LinesFileReader = () => {
logger.info(s"Loading places from ${config.placesFilePath}")
val reader = new BufferedReader(
new InputStreamReader(new FileInputStream(config.placesFilePath), "UTF-8")
)
Observable.fromLinesReader(reader)
}
lazy val loadPlacesBlocking: PlacesLoader => KDTreeMap[Location, Place] = { loader =>
Await.result(loader.load().runAsync, 1 minute)
}
lazy val clock: Clock = {
Observable
.interval(1 second)
.map(_ => Instant.now())
}
}
}
Side effects are:
fileReader
gives us a stream of lines from the file, and the clock is a stream of Instant
s. Both are modelled using the Monix Observable type.
The Application
trait is still very much imperative. We set up application status, served via Healthttpd, then start the gRPC server.
trait Application extends LazyLogging {
private val config = bind[Config]
private val healthttpd = bind[Healthttpd]
private val reverseGeocoderService = bind[ReverseGeocoderService]
def run(): Unit = {
healthttpd.startAndIndicateNotReady()
logger.info("Starting gRPC server")
val grpcServer = NettyServerBuilder
.forPort(config.grpcPort)
.addService(ReverseGeocoderGrpcMonix.bindService(reverseGeocoderService, monix.execution.Scheduler.global))
.build()
.start()
sys.ShutdownHookThread {
grpcServer.shutdown()
healthttpd.stop()
}
healthttpd.indicateReady()
grpcServer.awaitTermination()
}
}
The core of the application, concerned with serving requests, is pure, and easily tested. I’m using Task as an IO monad.
class ReverseGeocodeLocationRpc(places: KDTreeMap[Location, Place], clock: Clock) {
def handle(request: ReverseGeocodeLocationRequest): Task[ReverseGeocodeLocationResponse] = {
findNearest(request.latitude, request.longitude)(places)
.map(Task.now(_))
.map(addSunTimes(_, clock).map(toResponse))
.getOrElse(emptyTaskResponse)
}
private def findNearest(latitude: Latitude, longitude: Longitude)(places: KDTreeMap[Location, Place]): Option[Place] = {
places
.findNearest((latitude, longitude), 1)
.headOption
.map(_._2)
}
private case class Sun(rise: Option[Timestamp], set: Option[Timestamp])
private def addSunTimes(place: Task[Place], clock: Clock): Task[Place] = {
Task.zip2(place, clock.firstL).map {
case (p, t) =>
val zonedDateTime = t.atZone(ZoneId.of(p.timezone))
val sun = calculateSun(p.latitude, p.longitude, p.elevationMeters, zonedDateTime)
p.copy(sunriseToday = sun.rise, sunsetToday = sun.set)
}
}
private def calculateSun(latitude: Latitude,
longitude: Longitude,
altitudeMeters: Int,
zonedDateTime: ZonedDateTime): Sun = {
val solarTime = SolarTime.ofLocation(latitude, longitude, altitudeMeters, StdSolarCalculator.TIME4J)
val calendarDate = PlainDate.from(zonedDateTime.toLocalDate)
def toTimestamp(moment: Moment) = Timestamp(moment.getPosixTime, moment.getNanosecond())
val rise = solarTime.sunrise().apply(calendarDate).asScala.map(toTimestamp)
val set = solarTime.sunset().apply(calendarDate).asScala.map(toTimestamp)
Sun(rise, set)
}
private def toResponse(place: Place): ReverseGeocodeLocationResponse = {
ReverseGeocodeLocationResponse(Some(place))
}
private val emptyTaskResponse = Task.now(ReverseGeocodeLocationResponse.defaultInstance)
}
This was a pragmatic approach to putting togetger a Scala backend application. I picked a toy service to experiment with DI in the context of FP. I think that the result wasn’t too gnarly.
The source code is available on GitHub: reverse-geocoder.
I have just redone the software stack on my homelab cluster from scratch. I am still using Ubuntu 16.04 since the Docker versions that are currently available for 18.04 are not yet supported by Kubernetes.
These are the lab notes that I compiled while installing Kubernetes v1.10.3 via kubeadm. I chose Calico for pod networking.
In this section we’ll prepare the master and worker nodes for Kubernetes. We’ll start from a newly minted Ubuntu 16.04 on each node:
apt-get update
apt-get install -y apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb https://download.docker.com/linux/$(. /etc/os-release; echo "$ID") $(lsb_release -cs) stable"
apt-get update && apt-get install -y docker-ce=$(apt-cache madison docker-ce | grep 17.03 | head -1 | awk '{print $3}')
apt-get update && apt-get install -y apt-transport-https curl
curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
cat <<EOF >/etc/apt/sources.list.d/kubernetes.list
deb http://apt.kubernetes.io/ kubernetes-xenial main
EOF
apt-get update
apt-get install -y kubelet kubeadm kubectl
Turn swap off:
swapoff -a
Make sure that the cgroup driver used by kubelet is the same as the one used by Docker. To check whether the Docker cgroup driver matches the kubelet config:
docker info | grep -i cgroup
cat /etc/systemd/system/kubelet.service.d/10-kubeadm.conf
If the Docker cgroup driver and the kubelet config don’t match, update the latter. The flag we need to change is –cgroup-driver. If it’s already set, we can update the configuration like so:
sed -i "s/cgroup-driver=systemd/cgroup-driver=cgroupfs/g" /etc/systemd/system/kubelet.service.d/10-kubeadm.conf
Otherwise, open the systemd file and add the flag to an existing environment line. Then restart the kubelet:
systemctl daemon-reload
systemctl restart kubelet
Initialise the master node by running kubeadm init
. We need to specify the pod network CIDR for network policy to work correctly when we install Calico in a later step.
kubeadm init --pod-network-cidr=10.0.0.0/16
To be able to use kubectl as non-root user on master:
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config
Install Calico for networking:
kubectl apply -f https://docs.projectcalico.org/v3.1/getting-started/kubernetes/installation/hosted/rbac-kdd.yaml
kubectl apply -f https://docs.projectcalico.org/v3.1/getting-started/kubernetes/installation/hosted/kubernetes-datastore/calico-networking/1.7/calico.yaml
Once Calico has been installed, confirm that it is working by checking that the kube-dns pod is running before joining the worker nodes.
shane@master1:~$ kubectl get pods --all-namespaces
NAMESPACE NAME READY STATUS RESTARTS AGE
kube-system calico-node-2zrrz 2/2 Running 0 8m
kube-system etcd-master1 1/1 Running 0 11m
kube-system kube-apiserver-master1 1/1 Running 0 11m
kube-system kube-controller-manager-master1 1/1 Running 0 11m
kube-system kube-dns-86f4d74b45-bpsgs 3/3 Running 0 12m
kube-system kube-proxy-pkfjx 1/1 Running 0 12m
kube-system kube-scheduler-master1 1/1 Running 0 11m
Next we’ll join the worker nodes to our new Kubernetes cluster. Run the command that was output by kubeadm init
on each of the nodes:
kubeadm join --token <token> <master-ip>:<master-port> --discovery-token-ca-cert-hash sha256:<hash>
We should see nodes joining the cluster shortly:
shane@master1:~$ kubectl get nodes
NAME STATUS ROLES AGE VERSION
master1 Ready master 22m v1.10.3
minion1 Ready <none> 6m v1.10.3
minion2 Ready <none> 4m v1.10.3
minion3 NotReady <none> 4m v1.10.3
minion4 NotReady <none> 4m v1.10.3
To control the cluster remotely from our workstation, we grab the contents of /etc/kubernetes/admin.conf
from the master node and merge it into our local ~/.kube/config
configuration file.