From b46e8bc72f077a9550cc6263d6279d926cab7746 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 15 Aug 2024 13:22:41 +1200 Subject: [PATCH] WIP --- lib/async/redis/endpoint.rb | 248 ++++++++++++++++++++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 lib/async/redis/endpoint.rb diff --git a/lib/async/redis/endpoint.rb b/lib/async/redis/endpoint.rb new file mode 100644 index 0000000..32ff5ba --- /dev/null +++ b/lib/async/redis/endpoint.rb @@ -0,0 +1,248 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require 'io/endpoint' +require 'io/endpoint/host_endpoint' +require 'io/endpoint/ssl_endpoint' + +module Async + module Redis + # Represents a way to connect to a remote HTTP server. + class Endpoint < ::IO::Endpoint::Generic + SCHEMES = { + 'redis' => URI::HTTP, + 'rediss' => URI::HTTPS, + } + + def self.parse(string, endpoint = nil, **options) + url = URI.parse(string).normalize + + return self.new(url, endpoint, **options) + end + + # Construct an endpoint with a specified scheme, hostname, optional path, and options. + # + # @parameter scheme [String] The scheme to use, e.g. "http" or "https". + # @parameter hostname [String] The hostname to connect to (or bind to). + # @parameter *options [Hash] Additional options, passed to {#initialize}. + def self.for(scheme, hostname, path = "/", **options) + uri_klass = SCHEMES.fetch(scheme.downcase) do + raise ArgumentError, "Unsupported scheme: #{scheme.inspect}" + end + + self.new( + uri_klass.new(scheme, nil, hostname, nil, nil, path, nil, nil, nil).normalize, + **options + ) + end + + # Coerce the given object into an endpoint. + # @parameter url [String | Endpoint] The URL or endpoint to convert. + def self.[](url) + if url.is_a?(Endpoint) + return url + else + Endpoint.parse(url.to_s) + end + end + + # @option scheme [String] the scheme to use, overrides the URL scheme. + # @option hostname [String] the hostname to connect to (or bind to), overrides the URL hostname (used for SNI). + # @option port [Integer] the port to bind to, overrides the URL port. + # @option ssl_context [OpenSSL::SSL::SSLContext] the context to use for TLS. + # @option alpn_protocols [Array] the alpn protocols to negotiate. + def initialize(url, endpoint = nil, **options) + super(**options) + + raise ArgumentError, "URL must be absolute (include scheme, host): #{url}" unless url.absolute? + + @url = url + + if endpoint + @endpoint = self.build_endpoint(endpoint) + else + @endpoint = nil + end + end + + def to_url + url = @url.dup + + unless default_port? + url.port = self.port + end + + return url + end + + def to_s + "\#<#{self.class} #{self.to_url} #{@options}>" + end + + def inspect + "\#<#{self.class} #{self.to_url} #{@options.inspect}>" + end + + attr :url + + def address + endpoint.address + end + + def secure? + ['https', 'wss'].include?(self.scheme) + end + + def protocol + @options.fetch(:protocol) do + if secure? + Protocol::HTTPS + else + Protocol::HTTP + end + end + end + + def default_port + secure? ? 443 : 80 + end + + def default_port? + port == default_port + end + + def port + @options[:port] || @url.port || default_port + end + + # The hostname is the server we are connecting to: + def hostname + @options[:hostname] || @url.hostname + end + + def scheme + @options[:scheme] || @url.scheme + end + + def authority(ignore_default_port = true) + if ignore_default_port and default_port? + @url.hostname + else + "#{@url.hostname}:#{port}" + end + end + + # Return the path and query components of the given URL. + def path + buffer = @url.path || "/" + + if query = @url.query + buffer = "#{buffer}?#{query}" + end + + return buffer + end + + def alpn_protocols + @options.fetch(:alpn_protocols) {self.protocol.names} + end + + def localhost? + @url.hostname =~ /^(.*?\.)?localhost\.?$/ + end + + # We don't try to validate peer certificates when talking to localhost because they would always be self-signed. + def ssl_verify_mode + if self.localhost? + OpenSSL::SSL::VERIFY_NONE + else + OpenSSL::SSL::VERIFY_PEER + end + end + + def ssl_context + @options[:ssl_context] || OpenSSL::SSL::SSLContext.new.tap do |context| + if alpn_protocols = self.alpn_protocols + context.alpn_protocols = alpn_protocols + end + + context.set_params( + verify_mode: self.ssl_verify_mode + ) + end + end + + def build_endpoint(endpoint = nil) + endpoint ||= tcp_endpoint + + if secure? + # Wrap it in SSL: + return ::IO::Endpoint::SSLEndpoint.new(endpoint, + ssl_context: self.ssl_context, + hostname: @url.hostname, + timeout: self.timeout, + ) + end + + return endpoint + end + + def endpoint + @endpoint ||= build_endpoint + end + + def endpoint=(endpoint) + @endpoint = build_endpoint(endpoint) + end + + def bind(*arguments, &block) + endpoint.bind(*arguments, &block) + end + + def connect(&block) + endpoint.connect(&block) + end + + def each + return to_enum unless block_given? + + self.tcp_endpoint.each do |endpoint| + yield self.class.new(@url, endpoint, **@options) + end + end + + def key + [@url, @options] + end + + def eql? other + self.key.eql? other.key + end + + def hash + self.key.hash + end + + protected + + def tcp_options + options = @options.dup + + options.delete(:scheme) + options.delete(:port) + options.delete(:hostname) + options.delete(:ssl_context) + options.delete(:alpn_protocols) + options.delete(:protocol) + + return options + end + + def tcp_endpoint + ::IO::Endpoint.tcp(self.hostname, port, **tcp_options) + end + end + end +end