Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis caches are optionally prefixed #126

Merged
merged 1 commit into from
Oct 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ play.cache.redis {
#
timeout: 1s

# each instance can apply its own prefix to all keys it works with.
# This enables, e.g., use of multiple instances with the same redis
# database under the hood. There are two intended use cases:
#
# 1. two named instances with the same database
# 2. to avoid conflicts when there is also another application accessing the cache
#
# With named caches, this value may and should be locally overriden under
# 'play.cache.redis.instances.instance-name.this-property'.
#
prefix: null

# akka dispatcher
#
# note: this is global definition, can be locally overriden for each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ trait RedisCacheComponents
/** translates the cache name into the configuration */
implicit def redisInstance( name: String )( implicit resolver: play.api.cache.redis.configuration.RedisInstanceResolver ): RedisInstance = manager.instanceOf( name ).resolved

def cacheApi( instance: RedisInstance ): impl.RedisCaches = new impl.RedisCachesProvider( instance, akkaSerializer, environment, recoveryPolicyResolver ).get
def cacheApi( instance: RedisInstance ): impl.RedisCaches = new impl.RedisCachesProvider( instance, akkaSerializer, environment ).get
}
6 changes: 3 additions & 3 deletions src/main/scala/play/api/cache/redis/RedisCacheModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ class GuiceRedisCacheProvider( instance: RedisInstanceProvider ) extends Provide
lazy val get = new impl.RedisCachesProvider(
instance = instance.resolved( bind[ configuration.RedisInstanceResolver ] ),
serializer = bind[ connector.AkkaSerializer ],
environment = bind[ Environment ],
recovery = bind[ RecoveryPolicyResolver ]
environment = bind[ Environment ]
)(
system = bind[ akka.actor.ActorSystem ],
lifecycle = bind[ ApplicationLifecycle ]
lifecycle = bind[ ApplicationLifecycle ],
recovery = bind[ RecoveryPolicyResolver ]
).get
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ trait RedisSettings {
def recovery: String
/** configuration source */
def source: String
/** instance prefix */
def prefix: Option[ String ]
/** trait-specific equals */
override def equals( obj: scala.Any ) = equalsAsSettings( obj )
/** trait-specific equals, invokable from children */
Expand All @@ -40,24 +42,27 @@ object RedisSettings extends ConfigLoader[ RedisSettings ] {
dispatcher = loadInvocationContext( config, path )( required ),
recovery = loadRecovery( config, path )( required ),
timeout = loadTimeout( config, path )( required ),
source = loadSource( config, path )( "standalone".asFallback )
source = loadSource( config, path )( "standalone".asFallback ),
prefix = loadPrefix( config, path )( None.asFallback )
)

def withFallback( fallback: RedisSettings ) = new ConfigLoader[ RedisSettings ] {
def load( config: Config, path: String ) = apply(
dispatcher = RedisSettings.loadInvocationContext( config, path )( fallback.invocationContext.asFallback ),
recovery = RedisSettings.loadRecovery( config, path )( fallback.recovery.asFallback ),
timeout = RedisSettings.loadTimeout( config, path )( fallback.timeout.asFallback ),
source = loadSource( config, path )( fallback.source.asFallback )
source = loadSource( config, path )( fallback.source.asFallback ),
prefix = RedisSettings.loadPrefix( config, path )( fallback.prefix.asFallback )
)
}

def apply( dispatcher: String, timeout: FiniteDuration, recovery: String, source: String ): RedisSettings =
create( dispatcher, timeout, recovery, source )
def apply( dispatcher: String, timeout: FiniteDuration, recovery: String, source: String, prefix: Option[ String ] = None ): RedisSettings =
create( dispatcher, prefix, timeout, recovery, source )

@inline
private def create( _dispatcher: String, _timeout: FiniteDuration, _recovery: String, _source: String ) = new RedisSettings {
private def create( _dispatcher: String, _prefix: Option[ String ], _timeout: FiniteDuration, _recovery: String, _source: String ) = new RedisSettings {
val invocationContext = _dispatcher
val prefix = _prefix
val recovery = _recovery
val timeout = _timeout
val source = _source
Expand All @@ -72,6 +77,9 @@ object RedisSettings extends ConfigLoader[ RedisSettings ] {
private def loadSource( config: Config, path: String )( default: String => String ): String =
config.getOption( path / "source", _.getString ) getOrElse default( path / "source" )

private def loadPrefix( config: Config, path: String )( default: String => Option[ String ] ): Option[ String ] =
config.getOption( path / "prefix", _.getString ) orElse default( path / "prefix" )

private def loadTimeout( config: Config, path: String )( default: String => FiniteDuration ): FiniteDuration =
config.getOption( path / "timeout", _.getDuration ).map {
duration => FiniteDuration( duration.getSeconds, TimeUnit.SECONDS )
Expand All @@ -86,6 +94,7 @@ object RedisSettings extends ConfigLoader[ RedisSettings ] {
*/
trait RedisDelegatingSettings extends RedisSettings {
def settings: RedisSettings
def prefix = settings.prefix
def source = settings.source
def timeout = settings.timeout
def recovery = settings.recovery
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package play.api.cache.redis.connector

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

import akka.util.Timeout

/**
* Internal non-blocking Redis API implementing REDIS protocol
*
Expand Down
65 changes: 43 additions & 22 deletions src/main/scala/play/api/cache/redis/impl/RedisCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,83 +15,104 @@ private[ impl ] class RedisCache[ Result[ _ ] ]( redis: RedisConnector, builder:

@inline implicit protected def implicitBuilder: Builders.ResultBuilder[ Result ] = builder

def get[ T: ClassTag ]( key: String ) =
def get[ T: ClassTag ]( key: String ) = key.prefixed { key =>
redis.get[ T ]( key ).recoverWithDefault( None )
}

def getAll[ T: ClassTag ]( keys: String* ): Result[ Seq[ Option[ T ] ] ] =
def getAll[ T: ClassTag ]( keys: String* ): Result[ Seq[ Option[ T ] ] ] = keys.prefixed { keys =>
redis.mGet[ T ]( keys: _* ).recoverWithDefault( keys.toList.map( _ => None ) )
}

def set( key: String, value: Any, expiration: Duration ) =
def set( key: String, value: Any, expiration: Duration ) = key.prefixed { key =>
redis.set( key, value, expiration ).recoverWithDone
}

def setIfNotExists( key: String, value: Any, expiration: Duration ) =
def setIfNotExists( key: String, value: Any, expiration: Duration ) = key.prefixed { key =>
redis.setIfNotExists( key, value ).map { result =>
if ( result && expiration.isFinite( ) ) redis.expire( key, expiration )
result
}.recoverWithDefault( true )
}

def setAll( keyValues: (String, Any)* ): Result[ Done ] =
def setAll( keyValues: (String, Any)* ): Result[ Done ] = keyValues.prefixed { keyValues =>
redis.mSet( keyValues: _* ).recoverWithDone
}

def setAllIfNotExist( keyValues: (String, Any)* ): Result[ Boolean ] =
def setAllIfNotExist( keyValues: (String, Any)* ): Result[ Boolean ] = keyValues.prefixed { keyValues =>
redis.mSetIfNotExist( keyValues: _* ).recoverWithDefault( true )
}

def append( key: String, value: String, expiration: Duration ): Result[ Done ] =
def append( key: String, value: String, expiration: Duration ): Result[ Done ] = key.prefixed { key =>
redis.append( key, value ).flatMap { result =>
// if the new string length is equal to the appended string, it means they should equal
// when the finite duration is required, set it
if ( result == value.length && expiration.isFinite() ) redis.expire( key, expiration ) else Future.successful[ Unit ]( Unit )
}.recoverWithDone
}

def expire( key: String, expiration: Duration ) =
def expire( key: String, expiration: Duration ) = key.prefixed { key =>
redis.expire( key, expiration ).recoverWithDone
}

def matching( pattern: String ) =
def matching( pattern: String ) = pattern.prefixed { pattern =>
redis.matching( pattern ).recoverWithDefault( Seq.empty[ String ] )
}

def getOrElse[ T: ClassTag ]( key: String, expiration: Duration )( orElse: => T ) =
def getOrElse[ T: ClassTag ]( key: String, expiration: Duration )( orElse: => T ) = key.prefixed { key =>
getOrFuture( key, expiration )( orElse.toFuture ).recoverWithDefault( orElse )
}

def getOrFuture[ T: ClassTag ]( key: String, expiration: Duration )( orElse: => Future[ T ] ): Future[ T ] =
def getOrFuture[ T: ClassTag ]( key: String, expiration: Duration )( orElse: => Future[ T ] ): Future[ T ] = key.prefixed { key =>
redis.get[ T ]( key ).flatMap {
// cache hit, return the unwrapped value
case Some( value: T ) => value.toFuture
// cache miss, compute the value, store it into cache and return the value
case None => orElse flatMap ( value => redis.set( key, value, expiration ) map ( _ => value ) )
}.recoverWithFuture( orElse )
}

def remove( key: String ) =
def remove( key: String ) = key.prefixed { key =>
redis.remove( key ).recoverWithDone
}

def remove( key1: String, key2: String, keys: String* ) =
redis.remove( key1 +: key2 +: keys: _* ).recoverWithDone
def remove( key1: String, key2: String, keys: String* ) = ( key1 +: key2 +: keys ).prefixed { keys =>
redis.remove( keys: _* ).recoverWithDone
}

def removeAll( keys: String* ): Result[ Done ] =
def removeAll( keys: String* ): Result[ Done ] = keys.prefixed { keys =>
redis.remove( keys: _* ).recoverWithDone
}

def removeMatching( pattern: String ): Result[ Done ] =
def removeMatching( pattern: String ): Result[ Done ] = pattern.prefixed { pattern =>
redis.matching( pattern ).flatMap( keys => redis.remove( keys: _* ) ).recoverWithDone
}

def invalidate( ) =
redis.invalidate( ).recoverWithDone

def exists( key: String ) =
def exists( key: String ) = key.prefixed { key =>
redis.exists( key ).recoverWithDefault( false )
}

def increment( key: String, by: Long ) =
def increment( key: String, by: Long ) = key.prefixed { key =>
redis.increment( key, by ).recoverWithDefault( by )
}

def decrement( key: String, by: Long ) =
def decrement( key: String, by: Long ) = key.prefixed { key =>
increment( key, -by )
}

def list[ T: ClassTag ]( key: String ): RedisList[ T, Result ] =
def list[ T: ClassTag ]( key: String ): RedisList[ T, Result ] = key.prefixed { key =>
new RedisListImpl( key, redis )
}

def set[ T: ClassTag ]( key: String ): RedisSet[ T, Result ] =
def set[ T: ClassTag ]( key: String ): RedisSet[ T, Result ] = key.prefixed { key =>
new RedisSetImpl( key, redis )
}

def map[ T: ClassTag ]( key: String ): RedisMap[ T, Result ] =
def map[ T: ClassTag ]( key: String ): RedisMap[ T, Result ] = key.prefixed { key =>
new RedisMapImpl( key, redis )
}

override def toString = s"RedisCache(name=${ runtime.name })"
}
6 changes: 4 additions & 2 deletions src/main/scala/play/api/cache/redis/impl/RedisCaches.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ trait RedisCaches {
def javaAsync: play.cache.AsyncCacheApi
}

private[ redis ] class RedisCachesProvider( instance: RedisInstance, serializer: connector.AkkaSerializer, environment: Environment, recovery: RecoveryPolicyResolver )( implicit system: ActorSystem, lifecycle: ApplicationLifecycle ) extends Provider[ RedisCaches ] {
private implicit lazy val runtime: RedisRuntime = RedisRuntime( instance, recovery resolve instance.recovery )( system )
private[ redis ] class RedisCachesProvider( instance: RedisInstance, serializer: connector.AkkaSerializer, environment: Environment )( implicit system: ActorSystem, lifecycle: ApplicationLifecycle, recovery: RecoveryPolicyResolver ) extends Provider[ RedisCaches ] {
import RedisRuntime._

private implicit lazy val runtime: RedisRuntime = RedisRuntime( instance, instance.recovery, instance.prefix )( system )

private lazy val redisConnector = new connector.RedisConnectorProvider( instance, serializer ).get

Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/play/api/cache/redis/impl/RedisPrefix.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package play.api.cache.redis.impl

/**
* Each instance can apply its own prefix, e.g., to use multiple instances
* with the same redis database.
*
* @author Karel Cemus
*/
sealed trait RedisPrefix extends Any {
@inline def prefixed( key: String ): String
@inline def prefixed( key: Seq[ String ] ): Seq[ String ]
}

class RedisPrefixImpl( val prefix: String ) extends AnyVal with RedisPrefix {
@inline def prefixed( key: String ) = s"$prefix:$key"
@inline def prefixed( keys: Seq[ String ] ) = keys.map( prefixed )
}

object RedisEmptyPrefix extends RedisPrefix {
@inline def prefixed( key: String ) = key
@inline def prefixed( keys: Seq[ String ] ) = keys
}
18 changes: 13 additions & 5 deletions src/main/scala/play/api/cache/redis/impl/RedisRuntime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package play.api.cache.redis.impl

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.language.implicitConversions

import play.api.cache.redis._

Expand All @@ -15,16 +16,23 @@ import akka.actor.ActorSystem
*/
private[ redis ] trait RedisRuntime extends connector.RedisRuntime {
implicit def policy: RecoveryPolicy
implicit def prefix: RedisPrefix
implicit def timeout: akka.util.Timeout
}

private[ redis ] case class RedisRuntimeImpl( name: String, context: ExecutionContext, policy: RecoveryPolicy, timeout: akka.util.Timeout ) extends RedisRuntime
private[ redis ] case class RedisRuntimeImpl( name: String, context: ExecutionContext, policy: RecoveryPolicy, prefix: RedisPrefix, timeout: akka.util.Timeout ) extends RedisRuntime

private[ redis ] object RedisRuntime {

def apply( instance: RedisInstance, recovery: RecoveryPolicy )( implicit system: ActorSystem ): RedisRuntime =
apply( instance.name, instance.timeout, system.dispatchers.lookup( instance.invocationContext ), recovery )
implicit def string2prefix( prefix: Option[ String ] ): RedisPrefix =
prefix.fold[ RedisPrefix ]( RedisEmptyPrefix )( new RedisPrefixImpl( _ ) )

def apply( name: String, timeout: FiniteDuration, context: ExecutionContext, recovery: RecoveryPolicy ): RedisRuntime =
RedisRuntimeImpl( name, context, recovery, akka.util.Timeout( timeout ) )
implicit def string2recovery( policy: String )( implicit resolver: RecoveryPolicyResolver ): RecoveryPolicy =
resolver resolve policy

def apply( instance: RedisInstance, recovery: RecoveryPolicy, prefix: RedisPrefix )( implicit system: ActorSystem ): RedisRuntime =
apply( instance.name, instance.timeout, system.dispatchers.lookup( instance.invocationContext ), recovery, prefix )

def apply( name: String, timeout: FiniteDuration, context: ExecutionContext, recovery: RecoveryPolicy, prefix: RedisPrefix = RedisEmptyPrefix ): RedisRuntime =
RedisRuntimeImpl( name, context, recovery, prefix, akka.util.Timeout( timeout ) )
}
18 changes: 18 additions & 0 deletions src/main/scala/play/api/cache/redis/impl/dsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import play.api.cache.redis._
private[ impl ] object dsl {

@inline implicit def runtime2context( implicit runtime: RedisRuntime ): ExecutionContext = runtime.context
@inline implicit def runtime2prefix( implicit runtime: RedisRuntime ): RedisPrefix = runtime.prefix

/** enriches any ref by toFuture converting a value to Future.successful */
implicit class RichFuture[ T ]( val any: T ) extends AnyVal {
Expand Down Expand Up @@ -43,4 +44,21 @@ private[ impl ] object dsl {

/** maps units into akka.Done */
@inline private def unitAsDone( unit: Unit ) = Done

/** applies prefixer to produce final cache key */
implicit class CacheKey( val key: String ) extends AnyVal {
def prefixed[ T ]( f: String => T )( implicit prefixer: RedisPrefix ): T = f( prefixer prefixed key )
}

/** applies prefixer to produce final cache key */
implicit class CacheKeys( val keys: Seq[ String ] ) extends AnyVal {
def prefixed[ T ]( f: Seq[ String ] => T )( implicit prefixer: RedisPrefix ): T = f( prefixer prefixed keys )
}

/** applies prefixer to produce final cache key */
implicit class CacheKeyValues[ X ]( val keys: Seq[ (String, X) ] ) extends AnyVal {
def prefixed[ T ]( f: Seq[ (String, X) ] => T )( implicit prefixer: RedisPrefix ): T = f {
keys.map { case (key, value) => prefixer.prefixed( key ) -> value }
}
}
}
3 changes: 2 additions & 1 deletion src/test/scala/play/api/cache/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ trait RedisSettings {

def port = 6379

def database = 1
def database = 0
}

/**
Expand Down Expand Up @@ -127,6 +127,7 @@ object Redis {
classOf[ impl.RedisListSpec ],
classOf[ impl.RedisMapSpecs ],
classOf[ impl.RedisSetSpecs ],
classOf[ impl.RedisPrefixSpec ],
classOf[ impl.SynchronousCacheSpec ],
classOf[ util.ExpirationSpec ],
classOf[ RedisComponentsSpecs ]
Expand Down
Loading