Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced SETNX and SETEX by atomic SET #159

Merged
merged 2 commits into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ Standalone client now fails eagerly when the connection to redis is not
established. This is to avoid long timeout while the rediscala is trying
to reconnect. [#147](https://github.com/KarelCemus/play-redis/issues/147)

Replaced `SETNX` and `SETEX` by `SET` operation with `EX` and `NX` parameters to
implement the set operation atomically. In consequence, slightly changed `RedisConnector` API.
[#156](https://github.com/KarelCemus/play-redis/issues/156)

Deprecated `timeout` property and replaced by `sync-timeout` with the identical
meaning and use. Will be removed by 2.2.0. [#154](https://github.com/KarelCemus/play-redis/issues/154)

Expand Down
17 changes: 5 additions & 12 deletions src/main/scala/play/api/cache/redis/connector/RedisConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,13 @@ private[ redis ] trait CoreCommands {

/** Set a value into the cache. Expiration time in seconds (0 second means eternity).
*
* @param key cache storage key
* @param value value to store
* @param expiration record duration in seconds
* @param key cache storage key
* @param value value to store
* @param expiration record duration in seconds
* @param ifNotExists set only if the key does not exist
* @return promise
*/
def set( key: String, value: Any, expiration: Duration = Duration.Inf ): Future[ Unit ]

/** Set a value into the cache, if the key is not used. Otherwise ignore.
*
* @param key cache storage key
* @param value value to set
* @return true if set was successful, false if key was already defined
*/
def setIfNotExists( key: String, value: Any ): Future[ Boolean ]
def set( key: String, value: Any, expiration: Duration = Duration.Inf, ifNotExists: Boolean = false ): Future[ Boolean ]

/** Set a value into the cache. Expiration time is the eternity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ private[ connector ] class RedisConnectorImpl( serializer: AkkaSerializer, redis
case ex => serializationFailed( key, "Deserialization failed", ex )
}.get

def set( key: String, value: Any, expiration: Duration ): Future[ Unit ] =
def set( key: String, value: Any, expiration: Duration, ifNotExists: Boolean ): Future[ Boolean ] =
// no value to set
if ( value == null ) remove( key )
// set for finite duration
else if ( expiration.isFinite() ) encode( key, value ) flatMap ( setTemporally( key, _, expiration ) )
// set for infinite duration
else encode( key, value ) flatMap ( setEternally( key, _ ) )
if ( value == null ) remove( key ).map( _ => true )
// set the value
else encode( key, value ) flatMap ( doSet( key, _, expiration, ifNotExists ) )

/** encodes the object, reports an exception if fails */
private def encode( key: String, value: Any ): Future[ String ] = Future.fromTry {
Expand All @@ -68,23 +66,17 @@ private[ connector ] class RedisConnectorImpl( serializer: AkkaSerializer, redis
}
}

/** temporally stores already encoded value into the storage */
private def setTemporally( key: String, value: String, expiration: Duration ): Future[ Unit ] =
redis.setex( key, expiration.toSeconds, value ) executing "SETEX" withKey key andParameters s"$value $expiration" expects {
case _ => log.debug( s"Set on key '$key' on $expiration seconds." )
}

/** eternally stores already encoded value into the storage */
private def setEternally( key: String, value: String ): Future[ Unit ] =
redis.set( key, value ) executing "SET" withKey key andParameter value expects {
case true => log.debug( s"Set on key '$key' for infinite seconds." )
case false => log.warn( s"Set on key '$key' failed. Condition was not met." )
}

def setIfNotExists( key: String, value: Any ): Future[ Boolean ] =
encode( key, value ).flatMap( redis.setnx( key, _ ) ) executing "SETNX" withKey key andParameter s"encoded($value)" expects {
case false => log.debug( s"Set if not exists on key '$key' ignored. Value already exists." ); false
case true => log.debug( s"Set if not exists on key '$key' succeeded." ); true
/** implements the advanced set operation storing already encoded value into the storage */
private def doSet( key: String, value: String, expiration: Duration, ifNotExists: Boolean ): Future[ Boolean ] =
redis.set(
key,
value,
exSeconds = if ( expiration.isFinite ) Some( expiration.toSeconds ) else None,
NX = ifNotExists
) executing "SET" withKey key andParameters s"$value${ s" EX $expiration" when expiration.isFinite }${ " NX" when ifNotExists }" expects {
case true if expiration.isFinite => log.debug( s"Set on key '$key' for ${ expiration.toSeconds } seconds." ); true
case true => log.debug( s"Set on key '$key' for infinite seconds." ); true
case false => log.debug( s"Set on key '$key' ignored. Condition was not met." ); false
}

def mSet( keyValues: (String, Any)* ): Future[ Unit ] = mSetUsing( mSetEternally, (), keyValues: _* )
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/play/api/cache/redis/connector/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ package object connector {
@inline def asString = s"$key $value"
@inline def isNull = value == null
}

implicit class StringWhen( val value: String ) extends AnyVal {
def when( condition: Boolean ) = if ( condition ) value else ""
}
}
7 changes: 2 additions & 5 deletions src/main/scala/play/api/cache/redis/impl/RedisCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ private[ impl ] class RedisCache[ Result[ _ ] ]( redis: RedisConnector, builder:
}

def set( key: String, value: Any, expiration: Duration ) = key.prefixed { key =>
redis.set( key, value, expiration ).recoverWithDone
redis.set( key, value, expiration ).map( _ => (): Unit ).recoverWithDone
}

def setIfNotExists( key: String, value: Any, expiration: Duration ) = key.prefixed { key =>
redis.setIfNotExists( key, value ).map { result =>
if ( result && expiration.isFinite( ) ) redis.expire( key, expiration )
result
}.recoverWithDefault( true )
redis.set( key, value, expiration, ifNotExists = true ).recoverWithDefault( true )
}

def setAll( keyValues: (String, Any)* ): Result[ Done ] = keyValues.prefixed { keyValues =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.specs2.specification.AfterAll
/**
* @author Karel Cemus
*/
class RedisCacheComponentsSpecs extends Specification with WithApplication with AfterAll {
class RedisCacheComponentsSpec extends Specification with WithApplication with AfterAll {

object components extends RedisCacheComponents {
def actorSystem = system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.specs2.specification.{AfterAll, BeforeAll}
/**
* <p>Specification of the low level connector implementing basic commands</p>
*/
class RedisClusterSpecs( implicit ee: ExecutionEnv ) extends Specification with BeforeAll with AfterAll with WithApplication {
class RedisClusterSpec( implicit ee: ExecutionEnv ) extends Specification with BeforeAll with AfterAll with WithApplication {

import Implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,13 @@ class RedisConnectorFailureSpec( implicit ee: ExecutionEnv ) extends Specificati

"Connector failure" should {

"failed SETEX" in new MockedConnector {
serializer.encode( anyString ) returns "encoded"
commands.setex( anyString, anyLong, anyString )( anySerializer ) returns disconnected
// run the test
connector.set( key, value, 1.minute ) must throwA[ ExecutionFailedException ].await
}

"failed SETNX" in new MockedConnector {
serializer.encode( anyString ) returns "encoded"
commands.setnx( anyString, anyString )( anySerializer ) returns disconnected
// run the test
connector.setIfNotExists( key, value ) must throwA[ ExecutionFailedException ].await
}

"failed SET" in new MockedConnector {
serializer.encode( anyString ) returns "encoded"
commands.set( anyString, anyString, any, any, anyBoolean, anyBoolean )( anySerializer ) returns disconnected
// run the test
connector.set( key, value ) must throwA[ ExecutionFailedException ].await
connector.set( key, value, 1.minute ) must throwA[ ExecutionFailedException ].await
connector.set( key, value, ifNotExists = true ) must throwA[ ExecutionFailedException ].await
}

"failed MSET" in new MockedConnector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,34 @@ class RedisConnectorSpec( implicit ee: ExecutionEnv ) extends Specification with
}

"hit after set" in new TestCase {
connector.set( s"$prefix-$idx", "value" ).await
connector.set( s"$prefix-$idx", "value" ) must beTrue.await
connector.get[ String ]( s"$prefix-$idx" ) must beSome[ Any ].await
connector.get[ String ]( s"$prefix-$idx" ) must beSome( "value" ).await
}

"ignore set if not exists when already defined" in new TestCase {
connector.set( s"$prefix-if-not-exists-when-exists", "previous" ).await
connector.setIfNotExists( s"$prefix-if-not-exists-when-exists", "value" ) must beFalse.await
connector.get[ String ]( s"$prefix-if-not-exists-when-exists" ) must beSome[ Any ].await
connector.set( s"$prefix-if-not-exists-when-exists", "previous" ) must beTrue.await
connector.set( s"$prefix-if-not-exists-when-exists", "value", ifNotExists = true ) must beFalse.await
connector.get[ String ]( s"$prefix-if-not-exists-when-exists" ) must beSome( "previous" ).await
}

"perform set if not exists when undefined" in new TestCase {
connector.setIfNotExists( s"$prefix-if-not-exists", "value" ) must beTrue.await
connector.get[ String ]( s"$prefix-if-not-exists" ) must beSome[ Any ].await
connector.get[ String ]( s"$prefix-if-not-exists" ) must beNone.await
connector.set( s"$prefix-if-not-exists", "value", ifNotExists = true ) must beTrue.await
connector.get[ String ]( s"$prefix-if-not-exists" ) must beSome( "value" ).await
connector.set( s"$prefix-if-not-exists", "other", ifNotExists = true ) must beFalse.await
connector.get[ String ]( s"$prefix-if-not-exists" ) must beSome( "value" ).await
}

"perform set if not exists with expiration" in new TestCase {
connector.get[ String ]( s"$prefix-if-not-exists-with-expiration" ) must beNone.await
connector.set( s"$prefix-if-not-exists-with-expiration", "value", 2.seconds, ifNotExists = true ) must beTrue.await
connector.get[ String ]( s"$prefix-if-not-exists-with-expiration" ) must beSome( "value" ).await
// wait until the first duration expires
Future.after( 3 ) must not( throwA[ Throwable ] ).awaitFor( 4.seconds )
connector.get[ String ]( s"$prefix-if-not-exists-with-expiration" ) must beNone.await
}

"hit after mset" in new TestCase {
connector.mSet( s"$prefix-mset-$idx-1" -> "value-1", s"$prefix-mset-$idx-2" -> "value-2" ).await
connector.mGet[ String ]( s"$prefix-mset-$idx-1", s"$prefix-mset-$idx-2", s"$prefix-mset-$idx-3" ) must beEqualTo( List( Some( "value-1" ), Some( "value-2" ), None ) ).await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AsyncRedisSpecs( implicit ee: ExecutionEnv ) extends Specification with Re

"getOrElseUpdate (miss)" in new MockedAsyncRedis with OrElse {
connector.get[ String ]( anyString )( anyClassTag ) returns None
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], anyBoolean ) returns true
cache.getOrElseUpdate( key )( doFuture( value ) ) must beEqualTo( value ).await
orElse mustEqual 1
}
Expand All @@ -54,16 +54,16 @@ class AsyncRedisSpecs( implicit ee: ExecutionEnv ) extends Specification with Re
def recoverFrom[ T ]( rerun: => Future[ T ], default: => Future[ T ], failure: RedisException ) = rerun
}
connector.get[ String ]( anyString )( anyClassTag ) returns None
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], anyBoolean ) returns true
// run the test
cache.getOrElseUpdate( key ) { attempts match {
case 0 => attempt( failedFuture )
case _ => attempt( doFuture( value ) )
} } must beEqualTo( value ).await
// verification
orElse mustEqual 2
MockitoImplicits.there were MockitoImplicits.two( connector ).get[ String ]( anyString )( anyClassTag )
MockitoImplicits.there was MockitoImplicits.one( connector ).set( anyString, anyString, any[ Duration ] )
there were two( connector ).get[ String ]( anyString )( anyClassTag )
there was one( connector ).set( key, value, Duration.Inf, ifNotExists = false )
}
}
}
22 changes: 11 additions & 11 deletions src/test/scala/play/api/cache/redis/impl/RedisCacheSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,39 @@ class RedisCacheSpec( implicit ee: ExecutionEnv ) extends Specification with Red
}

"set" in new MockedCache {
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], beEq( false ) ) returns true
cache.set( key, value ) must beDone.await
}

"set recover with default" in new MockedCache {
connector.set( anyString, anyString, any[ Duration ] ) returns ex
connector.set( anyString, anyString, any[ Duration ], beEq( false ) ) returns ex
cache.set( key, value ) must beDone.await
}

"set if not exists (exists)" in new MockedCache {
connector.setIfNotExists( anyString, anyString ) returns false
connector.set( anyString, anyString, any[ Duration ], beEq( true ) ) returns false
cache.setIfNotExists( key, value ) must beFalse.await
}

"set if not exists (not exists)" in new MockedCache {
connector.setIfNotExists( anyString, anyString ) returns true
connector.set( anyString, anyString, any[ Duration ], beEq( true ) ) returns true
cache.setIfNotExists( key, value ) must beTrue.await
}

"set if not exists (exists) with expiration" in new MockedCache {
connector.setIfNotExists( anyString, anyString ) returns false
connector.set( anyString, anyString, any[ Duration ], beEq( true ) ) returns false
connector.expire( anyString, any[ Duration ] ) returns unit
cache.setIfNotExists( key, value, expiration ) must beFalse.await
}

"set if not exists (not exists) with expiration" in new MockedCache {
connector.setIfNotExists( anyString, anyString ) returns true
connector.set( anyString, anyString, any[ Duration ], beEq( true ) ) returns true
connector.expire( anyString, any[ Duration ] ) returns unit
cache.setIfNotExists( key, value, expiration ) must beTrue.await
}

"set if not exists recover with default" in new MockedCache {
connector.setIfNotExists( anyString, anyString ) returns ex
connector.set( anyString, anyString, any[ Duration ], beEq( true ) ) returns ex
cache.setIfNotExists( key, value ) must beTrue.await
}

Expand Down Expand Up @@ -153,7 +153,7 @@ class RedisCacheSpec( implicit ee: ExecutionEnv ) extends Specification with Red

"get or else (miss)" in new MockedCache with OrElse {
connector.get[ String ]( anyString )( anyClassTag ) returns None
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], beEq( false ) ) returns true
cache.getOrElse( key )( doElse( value ) ) must beEqualTo( value ).await
orElse mustEqual 1
}
Expand All @@ -172,7 +172,7 @@ class RedisCacheSpec( implicit ee: ExecutionEnv ) extends Specification with Red

"get or future (miss)" in new MockedCache with OrElse {
connector.get[ String ]( anyString )( anyClassTag ) returns None
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], beEq( false ) ) returns true
cache.getOrFuture( key )( doFuture( value ) ) must beEqualTo( value ).await
orElse mustEqual 1
}
Expand All @@ -194,7 +194,7 @@ class RedisCacheSpec( implicit ee: ExecutionEnv ) extends Specification with Red
def recoverFrom[ T ]( rerun: => Future[ T ], default: => Future[ T ], failure: RedisException ) = rerun
}
connector.get[ String ]( anyString )( anyClassTag ) returns None
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], beEq( false ) ) returns true
// run the test
cache.getOrFuture( key ) { attempts match {
case 0 => attempt( failedFuture )
Expand All @@ -203,7 +203,7 @@ class RedisCacheSpec( implicit ee: ExecutionEnv ) extends Specification with Red
// verification
orElse mustEqual 2
there were two( connector ).get[ String ]( anyString )( anyClassTag )
there was one( connector ).set( anyString, anyString, any[ Duration ] )
there was one( connector ).set( key, value, Duration.Inf, ifNotExists = false )
}

"remove" in new MockedCache {
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/play/api/cache/redis/impl/SyncRedisSpecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ class SyncRedisSpecs( implicit ee: ExecutionEnv ) extends Specification with Red

"get or else (miss)" in new MockedSyncRedis with OrElse {
connector.get[ String ]( anyString )( anyClassTag ) returns None
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], anyBoolean ) returns true
cache.getOrElse( key )( doElse( value ) ) must beEqualTo( value )
orElse mustEqual 1
}

"get or else (failure)" in new MockedSyncRedis with OrElse {
connector.get[ String ]( anyString )( anyClassTag ) returns ex
connector.set( anyString, anyString, any[ Duration ] ) returns unit
connector.set( anyString, anyString, any[ Duration ], anyBoolean ) returns true
cache.getOrElse( key )( doElse( value ) ) must beEqualTo( value )
orElse mustEqual 1
}
Expand Down