# Subscriptions

# Resolving a subscription

Resolvers for Subscription differ from resolvers for fields of other types. Specifically, Subscription resolvers require to return an implementation of EventStream.

Resolver.swift
struct Resolver {
    // Example using an async generator
    func hello(_: Context, _: NoArguments) async -> EventStream<String> {
        let stream = AsyncStream { con in
            for word in ["Hello", "Bonjour", "Ciao"] {
                con.yield(word)
            }
            con.finish()
        }
        return stream.toEventStream()
    }

    func postCreated(ctx: Context, _: NoArguments) async -> EventStream<Post> {
        ctx.pubsub.asyncStream(Post.self, for: "POST_CREATED").toEventStrem()
    }
}

The resolver function must return an EventStream built from AsyncEventStream which can come from any AsyncSequence, a standard protocol in Swift 5.5 for asynchronous, sequential, iterated elements.

# AsyncPubSub

Pioneer provide an in memory publish-subscribe (Pub/Sub) model named, AsyncPubSub, to concurrent safely track events and update all active subscribers.

AsyncPubSub conforms to PubSub which enables your server code to both publish events to a particular topic/trigger/trigger and listen for events associated with a particular topic.

import Pioneer

let pubsub = AsyncPubSub()

# Publishing an event

You can publish an event using the publish) method:

await pubsub.publish("POST_CREATED", Post(by: "Jeff Bezos", content: "How much many do I have"))
  • The first parameter is the trigger of the event you're publishing to, as a string.
    • You don't need to register a trigger name before publishing to it.
  • The second parameter is the payload associated with the event.

As an example, let's say our GraphQL API supports a createPost mutation

A basic resolver for that might look this.

struct Resolver {
    ...

    func createPost(ctx: Context, args: CreatePostArgs) async throws -> Post {
        let post = Post(args: args)
        try await post.create(on: ctx.req.db)
        return post
    }
}

The example schema in swift

Mutation {
    Field("createPost", at: Resolver.createPost) {
        Arguments("author", at: \.author)
        Arguments("content", at: \.content)
    }
}

The example schema in GraphQL SDL

type Mutation {
  createPost(author: String, content: String): Post!
}

After we successfully persist the new post into the database, we can publish it to the pubsub as an event.

struct Resolver {
    ...

    func createPost(ctx: Context, args: CreatePostArgs) async throws -> Post {
        let post = Post(args: args)
        try await post.create(on: ctx.req.db)
        await ctx.pubsub.publish(for: "POST_CREATED", post)
        return post
    }
}

Next, we can listen for this event in our Subscription resolver.

# Listening for events

An AsyncStream asynchronously iterate over events, and if that stream comes from a PubSub, it will be associated with a particular trigger and will receive the events published under that trigger.

You can create an AsyncStream by calling the asyncStream method of PubSub and passing in a the event trigger that this stream should listen for and the type.

pubsub.asyncStream(Post.self, for: "POST_CREATED");

Which brings us back to the example above.

Resolver.swift
struct Resolver {
    ...

    func postCreated(ctx: Context, _: NoArguments) async -> EventStream<Post> {
        ctx.pubsub.asyncStream(Post.self, for: "POST_CREATED").toEventStrem()
    }
}

# Custom Pub/Sub

As mentioned before, AsyncPubSub is an in memory pub-sub implementation that is limited to a single server instance, which may become an issue on production environments where there are multiple distributed server instances.

In which case, you likely want to either use or implement a custom pub-sub system that is backed by an external datastore.

# PubSub as protocol

Pioneer exported the PubSub protocol which allow different implementation with the same API AsyncPubSub notably implementation backed by popular event-publishing systems (i.e. Redis) with similar API which allow user of this library to prototype with the in memory AsyncPubSub and easily migrate to a distributed PubSub implementation without very little changes.

The basic rules to implement A PubSub are as follow:

Conformance

The method asyncStream should return an AsyncStream for a single subscriber where it can be unsubscribed without closing the topic entirely.

  • The type of DataType should conform to Sendable and Decodabble to help make sure it is safe to pass around and be able to decoded if necessary (since it is likely to come from a network call).
  • Recommended to create a new AsyncStream on each method call.
  • If you are having trouble with broadcasting a publisher to multiple consumer/subscriber, recommended taking a look at Broadcast.

The method publish should publish events to all subscriber that associated with the trigger.

  • The DataType conform to Sendable and Encodable to help make sure it is safe to pass around and be able to encoded if necessary (since it is likely to be send on a network call).

The method close should dispose and shutdown all subscriber that associated with the trigger.

Data races and asynchronous scope.

The implementation should be free of data races and be working safely under asynchronous scopes.

  • If you are having trouble with data-race safe state management, recommended use Swift 5.5 Actor.

PubSub specification
https://swiftpackageindex.com/d-exclaimation/pioneer/documentation/pioneer/pubsub

# Broadcast

Additionally, common client libraries for popular event-publishing systems usually only provide a function that to subscribe to a specific publisher, but

  • No option of unsubscribing without closing the publisher entirely
  • Only allow 1 subscriber for each publisher / channel
    • Usually because subscription is its own new network connection and multiple of those can be resource intensive.

In this case, the actor, Broadcast, is provided which can broadcast any events from a publisher to multiple different downstream where each downstream share the same upstream and can be unsubscribed / disposed (to prevent leaks) without closing the upstream and publisher.

%%{init: { 'theme': 'base' } }%%
graph LR
    U[Upstream] -->|Source Events| B(Broadcast)
    B -->|Broadcasted Events| I[1st Downstream]
    B -->|Broadcasted Events| J[2nd Downstream]
    B -->|Broadcasted Events| K[3rd Downstream]
    B -->|Broadcasted Events| L[4th Downstream]

Broadcast provide the methods:

  • downstream to create a new subscriber stream that will receive events broadcasted
  • publish to broadcast the events to all subscriber
  • close to close the broadcast and shutdown all subscriber

Essentially, it will be applied on a event publisher to create multiple downstream(s) and handle distribution of events, where:

  • Different consumer can subscribe to the same upstream and all of them get the same messages
    • Usually to prevent making multiple subscription might be resource intensive
  • Downstream(s) can be disposed, stopped, or cancelled individually to prevent leaks
    • Disposed by cancelling Task used to consume it
  • Closing any downstream(s) does not close other downstream(s), broadcast, and upstream
  • Closing broadcast dispose all downstream(s), but not necessarily the upstream
let broadcast = Broadcast<Event>()
receiveSubscriptionFromExternalPublisher(
    ...,
    onMessage: { msg async in
        let event = convertToEvent(msg)
        await broadcast.publish(event)
    },
    onFinished: { reason async in
        await broadcast.close()
    }
)

// All of these downstream are getting all messages from the upstream
let downstream0 = await broadcast.downstream().stream
let downstream1 = await broadcast.downstream().stream
let downstream2 = await broadcast.downstream().stream
let downstream3 = await broadcast.downstream().stream

sendToExternalPublisher(..., msg: SomeMessage())

// Dispose a downstream without effecting the others
let task3 = Task {
    for await msg in downstream3 {
        // ...
    }
}

task3.cancel()


// Shutdown all downstreams
closeExternalPublisher(...)
await broadcast.close()

Broadcast specification
https://swiftpackageindex.com/d-exclaimation/pioneer/main/documentation/pioneer/broadcast

# Redis Example

As an example, say we want to build a redis backed PubSub.

Here we create an example implementation of PubSub using Redis, that utilize Redis channel for Pub/Sub. We also make use of Broadcast to not open multiple connection and use the 1 redis subscription connection for all GraphQL subscription of the same topic.

RedisPubSub.swift
import Pioneer
import Redis
import Foundation
import NIOFoundationCompat

struct RedisPubSub: PubSub {

    // MARK: - Actor for distribution
    actor Dispatcher {
        private let redis: RedisClient
        private var broadcasting: [String: Broadcast<Data>] = [:]

        init(_ redis: RedisClient) {
            self.redis = redis
        }

        /// Get a downstream from the broadcast for the channel given
        func downstream(to channel: String) async -> AsyncStream<Data> {
            let broadcast = await subscribe(to: channel)
            let downstream = await broadcast.downstream()
            return downstream.stream
        }

        /// Get the broadcast for the channel if exist, otherwise make a new one
        private func subscribe(to channel: String) async -> Broadcast<Data> {
            if let broadcast = broadcasting[channel] {
                return broadcast
            }
            let broadcast = Broadcast<Data>()
            broadcasting[channel] = broadcast
            await apply(from: .init(channel), to: broadcast)
            return broadcast
        }

        /// Apply broadcasting to the Redis channel subscription
        private func apply(from channel: RedisChannelName, to broadcast: Broadcast<Data>) async {
            do {
                try await redis.subscribe(
                    to: channel,
                    messageReceiver: { _, msg in
                        guard case .bulkString(.some(let buffer)) = msg else { return }
                        let data = Data(buffer: buffer)
                        Task {
                            await broadcast.publish(data)
                        }
                    },
                    onUnsubscribe: { _, _ in
                        Task {
                            await broadcast.close()
                        }
                    }
                )
                .get()
            } catch {
              await broadcast.close()
            }
        }

        /// Pubblish the data (which is RESPValueConvertible) to the specific redis channel
        func publish(for channel: String, _ value: Data) async {
            let _ = try? await redis.publish(value, to: .init(channel)).get()
        }

        /// Close the redis channel subscription and all of the downstreams
        func close(for channel: String) async {
            try? await redis.unsubscribe(from: .init(channel)).get()
            await broadcasting[channel]?.close()
        }
    }

    // MARK: -- Protocol required methods

    public func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type = DataType.self, for trigger: String) -> AsyncStream<DataType> {
        AsyncStream<DataType> { con in
            let task = Task {
                let stream = await dispatcher.downstream(to: trigger)
                for await data in stream {
                    guard let event = try? JSONDecoder().decode(DataType.self, data) else { continue }
                    con.yield(event)
                }
                con.finish()
            }
            con.onTermination = { @Sendable _ in
                task.cancel()
            }
        }
    }

    public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async {
        guard let data = try? JSONEncoder().encode(payload) else { return }
        await dispatcher.publish(for: trigger, data)
    }

    public func close(for trigger: String) async {
        await dispatcher.close(for: trigger)
    }

    // MARK: - Properties

    private let dispatcher: Dispatcher

    public init(_ redis: RedisClient) {
        self.dispatcher = .init(redis)
    }
}

Now we can have the Resolver to have a property pubsub of type PubSub instead of AsyncPubSub, while still being able to use AsyncPubSub during development.

Message.swift
struct Message: Sendable, Codable { ... }
Resolver.swift

struct Resolver {
    let pubsub: PubSub = app.environment.isRelease ? RedisPubSub(app.redis) : AsyncPubSub()

    func create(ctx: Context, _: NoArguments) async -> Message {
        let message = ...
        await pubsub.publish(message)
        return message
    }

    func onCreate(ctx: Context, _: NoArguments) async -> EventStream<Message> {
        pubsub.asyncStream(Message.self, for: "message-create").toEventStream()
    }
}

So now, if we can use the RedisPubSub on a production environment.