Skip to content

Commit

Permalink
feat(functions): add experimental invoke with streamed responses (#346)
Browse files Browse the repository at this point in the history
* feat(functions): add experimental invoke with streamed responses

* chore: use HTTPClient

* fix x-region header after rebase

* rename method to `_invokeWithStreamedResponse`
  • Loading branch information
grdsdev authored Apr 22, 2024
1 parent e62ad89 commit 2611b09
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 28 deletions.
122 changes: 94 additions & 28 deletions Sources/Functions/FunctionsClient.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import _Helpers
import Foundation
@preconcurrency import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand All @@ -20,21 +20,23 @@ public actor FunctionsClient {
var headers: [String: String]
/// The Region to invoke the functions in.
let region: String?
/// The fetch handler used to make requests.
let fetch: FetchHandler

private let http: HTTPClient

/// Initializes a new instance of `FunctionsClient`.
///
/// - Parameters:
/// - url: The base URL for the functions.
/// - headers: Headers to be included in the requests. (Default: empty dictionary)
/// - region: The Region to invoke the functions in.
/// - logger: SupabaseLogger instance to use.
/// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:))
@_disfavoredOverload
public init(
url: URL,
headers: [String: String] = [:],
region: String? = nil,
logger: (any SupabaseLogger)? = nil,
fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) }
) {
self.url = url
Expand All @@ -43,7 +45,7 @@ public actor FunctionsClient {
self.headers["X-Client-Info"] = "functions-swift/\(version)"
}
self.region = region
self.fetch = fetch
http = HTTPClient(logger: logger, fetchHandler: fetch)
}

/// Initializes a new instance of `FunctionsClient`.
Expand All @@ -52,20 +54,16 @@ public actor FunctionsClient {
/// - url: The base URL for the functions.
/// - headers: Headers to be included in the requests. (Default: empty dictionary)
/// - region: The Region to invoke the functions in.
/// - logger: SupabaseLogger instance to use.
/// - fetch: The fetch handler used to make requests. (Default: URLSession.shared.data(for:))
public init(
url: URL,
headers: [String: String] = [:],
region: FunctionRegion? = nil,
logger: (any SupabaseLogger)? = nil,
fetch: @escaping FetchHandler = { try await URLSession.shared.data(for: $0) }
) {
self.url = url
self.headers = headers
if headers["X-Client-Info"] == nil {
self.headers["X-Client-Info"] = "functions-swift/\(version)"
}
self.region = region?.rawValue
self.fetch = fetch
self.init(url: url, headers: headers, region: region?.rawValue, logger: logger, fetch: fetch)
}

/// Updates the authorization header.
Expand All @@ -92,10 +90,10 @@ public actor FunctionsClient {
options: FunctionInvokeOptions = .init(),
decode: (Data, HTTPURLResponse) throws -> Response
) async throws -> Response {
let (data, response) = try await rawInvoke(
let response = try await rawInvoke(
functionName: functionName, invokeOptions: options
)
return try decode(data, response)
return try decode(response.data, response.response)
}

/// Invokes a function and decodes the response as a specific type.
Expand Down Expand Up @@ -130,33 +128,101 @@ public actor FunctionsClient {
private func rawInvoke(
functionName: String,
invokeOptions: FunctionInvokeOptions
) async throws -> (Data, HTTPURLResponse) {
) async throws -> Response {
var request = Request(
path: functionName,
method: .post,
headers: invokeOptions.headers.merging(headers) { invoke, _ in invoke },
body: invokeOptions.body
)

if let region = invokeOptions.region ?? region {
request.headers["x-region"] = region
}

let response = try await http.fetch(request, baseURL: url)

guard 200 ..< 300 ~= response.statusCode else {
throw FunctionsError.httpError(code: response.statusCode, data: response.data)
}

let isRelayError = response.response.value(forHTTPHeaderField: "x-relay-error") == "true"
if isRelayError {
throw FunctionsError.relayError
}

return response
}

/// Invokes a function with streamed response.
///
/// Function MUST return a `text/event-stream` content type for this method to work.
///
/// - Parameters:
/// - functionName: The name of the function to invoke.
/// - invokeOptions: Options for invoking the function.
/// - Returns: A stream of Data.
///
/// - Warning: Experimental method.
/// - Note: This method doesn't use the same underlying `URLSession` as the remaining methods in the library.
public func _invokeWithStreamedResponse(
_ functionName: String,
options invokeOptions: FunctionInvokeOptions = .init()
) -> AsyncThrowingStream<Data, any Error> {
let (stream, continuation) = AsyncThrowingStream<Data, any Error>.makeStream()
let delegate = StreamResponseDelegate(continuation: continuation)

let session = URLSession(configuration: .default, delegate: delegate, delegateQueue: nil)

let url = url.appendingPathComponent(functionName)
var urlRequest = URLRequest(url: url)
urlRequest.allHTTPHeaderFields = invokeOptions.headers.merging(headers) { invoke, _ in invoke }
urlRequest.httpMethod = (invokeOptions.method ?? .post).rawValue
urlRequest.httpBody = invokeOptions.body

let region = invokeOptions.region ?? region
if let region {
urlRequest.setValue(region, forHTTPHeaderField: "x-region")
let task = session.dataTask(with: urlRequest) { data, response, _ in
guard let httpResponse = response as? HTTPURLResponse else {
continuation.finish(throwing: URLError(.badServerResponse))
return
}

guard 200 ..< 300 ~= httpResponse.statusCode else {
let error = FunctionsError.httpError(code: httpResponse.statusCode, data: data ?? Data())
continuation.finish(throwing: error)
return
}

let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true"
if isRelayError {
continuation.finish(throwing: FunctionsError.relayError)
}
}

let (data, response) = try await fetch(urlRequest)
task.resume()

guard let httpResponse = response as? HTTPURLResponse else {
throw URLError(.badServerResponse)
}
continuation.onTermination = { _ in
task.cancel()

guard 200 ..< 300 ~= httpResponse.statusCode else {
throw FunctionsError.httpError(code: httpResponse.statusCode, data: data)
// Hold a strong reference to delegate until continuation terminates.
_ = delegate
}

let isRelayError = httpResponse.value(forHTTPHeaderField: "x-relay-error") == "true"
if isRelayError {
throw FunctionsError.relayError
}
return stream
}
}

final class StreamResponseDelegate: NSObject, URLSessionDataDelegate, Sendable {
let continuation: AsyncThrowingStream<Data, any Error>.Continuation

init(continuation: AsyncThrowingStream<Data, any Error>.Continuation) {
self.continuation = continuation
}

func urlSession(_: URLSession, dataTask _: URLSessionDataTask, didReceive data: Data) {
continuation.yield(data)
}

return (data, httpResponse)
func urlSession(_: URLSession, task _: URLSessionTask, didCompleteWithError error: (any Error)?) {
continuation.finish(throwing: error)
}
}
1 change: 1 addition & 0 deletions Sources/Supabase/SupabaseClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public final class SupabaseClient: @unchecked Sendable {
url: functionsURL,
headers: defaultHeaders,
region: options.functions.region,
logger: options.global.logger,
fetch: fetchWithAuth
)

Expand Down

0 comments on commit 2611b09

Please sign in to comment.