#EventStream

Swift 5.5 brought in a reactive stream like feature in the form of a protocol named AsyncSequence.

GraphQLSwift/GraphQL required a implementaion of EventStream built with any reactive stream like data structure to process subscription operations.

#AsyncEventStream

Pioneer provide an implementation of EventStream named AsyncEventStream that takes a generic AsyncSequence. This mean you can create an event stream using this class from any AsyncSequence.

let eventStream: EventStream<Int> = AsyncEventStream<Int, MyIntAsyncSequence>( from: MyIntAsyncSequence() )

#Extensions for AsyncSequence

Converting can be done as well with using the extended method for all AsyncSequence. In fact, this is the recommended approach as there are a couple additional features you can add while converting.

let eventStream = AsyncStream<Int>(...) .toEventStream() // Initial value before any stream values let eventStream1 = AsyncStream<Int>(...) .toEventStream(initialValue: 0) // End value after stream finishes (excluding termination and value is lazily loaded; hence the function there) let eventStream2 = AsyncStream<Int>(...) .toEventStream(initialValue: 0, endValue: { 10 })

#Limitations

#Type casting limitations

One of the problem occured with requiring a protocol instead of a concrete type, is the additional generic which may lead to deeply nested generics.

Due to that, Pioneer will use AsyncThrowingStream when transforming stream values instead of using the built-in .compactMap method to avoid deeply uncastable type.

.map and .compactMap type results
let asyncStream: AsyncThrowingStream<Int> let asyncStream1: AsyncMapSequence<AsyncThrowingStream<Int>, Int> = asyncStream.map { $0 + 1 } let asyncStream2: AsyncThrowingCompactMapSequence<AsyncMapSequence<AsyncThrowingStream<Int>, Data>, String> = asyncStream.compactMap { try JSONEncoder().encode($0) }

#Termination callback

By default, AsyncEventStream will cancel the task consuming the provided AsyncSequence when converting to an AsyncStream of a different type. For something like AsyncStream, this cancellation will trigger its termination callback so resources can be deallocated and prevent memory leaks of any kind.

However, a custom AsyncSequence might have a different trigger and approach in termination. Hence, it's best to explicit provide a termination callback when converting to EventStream.

let eventStream = MyAsyncSequence().toEventStream( onTermination: { termination in if case .cancelled = termination { // do something } } )

Cases where stream is no longer consumed / stopped and termination will require to be triggered:

  • Stream ended itself
  • Client send a explicit stop request to end the subscription (might be before stream ended)
  • Client disconnect and implicitly stop any running subscription