Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a cache validation step for sending #206

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Speckle.Sdk/SQLite/CacheDbCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public enum CacheOperation
InsertOrReplace,
Has,
Get,
GetBatch,
Delete,
GetAll,
BulkInsertOrIgnore,
Expand All @@ -28,6 +29,7 @@ static CacheDbCommands()
Commands[(int)CacheOperation.Has] = "SELECT 1 FROM objects WHERE hash = @hash LIMIT 1";
Commands[(int)CacheOperation.Get] = "SELECT content FROM objects WHERE hash = @hash LIMIT 1";
Commands[(int)CacheOperation.Delete] = "DELETE FROM objects WHERE hash = @hash";
Commands[(int)CacheOperation.GetBatch] = "SELECT hash, content FROM objects WHERE hash IN ";
Commands[(int)CacheOperation.GetAll] = "SELECT hash, content FROM objects";

Commands[(int)CacheOperation.BulkInsertOrIgnore] = "INSERT OR IGNORE INTO objects (hash, content) VALUES ";
Expand Down
36 changes: 36 additions & 0 deletions src/Speckle.Sdk/SQLite/SQLiteJsonCacheManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,42 @@ public void DeleteObject(string id) =>
return (string?)command.ExecuteScalar();
}
);

public IReadOnlyCollection<(string Id, string Json)> GetObjects(string[] ids) =>
_pool.Use(
CacheOperation.GetBatch,
command =>
{
CreateBulkGet(command, ids);
var list = new HashSet<(string, string)>();
using var reader = command.ExecuteReader();
while (reader.Read())
{
list.Add((reader.GetString(0), reader.GetString(1)));
}
return list;
}
);

private void CreateBulkGet(SqliteCommand cmd, string[] ids)
{
StringBuilder sb = Pools.StringBuilders.Get();
sb.AppendLine(CacheDbCommands.Commands[(int)CacheOperation.GetBatch]);
int i = 0;
sb.Append('(');
foreach (var id in ids)
{
sb.Append($"@key{i},");
cmd.Parameters.AddWithValue($"@key{i}", id);
i++;
}
sb.Remove(sb.Length - 1, 1);
sb.Append(')');
#pragma warning disable CA2100
cmd.CommandText = sb.ToString();
#pragma warning restore CA2100
Pools.StringBuilders.Return(sb);
}

//This does an insert or ignores if already exists
public void SaveObject(string id, string json) =>
Expand Down
21 changes: 21 additions & 0 deletions src/Speckle.Sdk/Serialisation/V2/AsyncExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,25 @@ public static async ValueTask<TItem> FirstAsync<TItem>(this IAsyncEnumerable<TIt
}
throw new InvalidOperationException("Sequence contains no elements");
}

#if NETSTANDARD2_0
public static IEnumerable<T[]> Chunk<T>(this IEnumerable<T> source, int chunkSize)
{
List<T> list = new(chunkSize);
foreach(T item in source)
{
list.Add(item);
if(list.Count == chunkSize)
{
yield return list.ToArray();
list = new List<T>(chunkSize);
}
}
//don't forget the last one!
if(list.Count != 0)
{
yield return list.ToArray();
}
}
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public void Dispose() { }
public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();

public bool HasObject(string objectId) => throw new NotImplementedException();
public IReadOnlyCollection<(string Id, string Json)> GetObjects(string[] ids) => throw new NotImplementedException();
}

public class DummySendServerObjectManager : IServerObjectManager
Expand Down
26 changes: 26 additions & 0 deletions src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;
using Closures = System.Collections.Generic.Dictionary<Speckle.Sdk.Serialisation.Id, int>;
Expand Down Expand Up @@ -55,6 +56,8 @@ public sealed class SerializeProcess(
private long _uploaded;
private long _cached;

private long _validating;

[AutoInterfaceIgnore]
public void Dispose()
{
Expand All @@ -81,9 +84,32 @@ public async Task<SerializeProcessResults> Serialize(Base root, CancellationToke
await Done().ConfigureAwait(true);
await channelTask.ConfigureAwait(false);
await findTotalObjectsTask.ConfigureAwait(false);
if (!_options.SkipFindTotalObjects)
{
var task = Task.Factory.StartNew(() => Validate(root.id.NotNull()), cancellationToken, TaskCreationOptions.AttachedToParent | TaskCreationOptions.LongRunning, TaskScheduler.Default );
await task.ConfigureAwait(true);
}
return new(root.id.NotNull(), _objectReferences.Freeze());
}

private void Validate(string rootId)
{
var root = sqLiteJsonCacheManager.GetObject(rootId).NotNull();
var childIds = ClosureParser.GetChildrenIds(root);

Parallel.ForEach(childIds.Chunk(200), new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
batch =>
{
var objects = sqLiteJsonCacheManager.GetObjects(batch);
var missing = batch.Except(objects.Select(x => x.Id)).FirstOrDefault();
if (missing != null)
{
throw new SpeckleException($"Object(s) {string.Join(",", batch.Except(objects.Select(x => x.Id)))} not found in cache.");
}
Interlocked.Exchange(ref _validating, _validating + objects.Count);
progress?.Report(new(ProgressEvent.ValidatingObjects, _validating, _objectsFound));
});
}
private void TraverseTotal(Base obj)
{
foreach (var child in baseChildFinder.GetChildren(obj))
Expand Down
1 change: 1 addition & 0 deletions src/Speckle.Sdk/Transports/ProgressArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public enum ProgressEvent
FindingChildren,
UploadBytes,
UploadedObjects,
ValidatingObjects,

CacheCheck,
DownloadBytes,
Expand Down
1 change: 1 addition & 0 deletions tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ public void Dispose() { }
public void DeleteObject(string id) => throw new NotImplementedException();

public string? GetObject(string id) => null;
public IReadOnlyCollection<(string Id, string Json)> GetObjects(string[] ids) => throw new NotImplementedException();

public void SaveObject(string id, string json) => throw new NotImplementedException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public void Dispose() { }
public void DeleteObject(string id) => throw new NotImplementedException();

public string? GetObject(string id) => savedObjects.GetValueOrDefault(id);
public IReadOnlyCollection<(string Id, string Json)> GetObjects(string[] ids) => throw new NotImplementedException();

public void SaveObject(string id, string json) => throw new NotImplementedException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Speckle.Sdk.Serialization.Tests;
public class DummySqLiteSendManager : ISqLiteJsonCacheManager
{
public string? GetObject(string id) => throw new NotImplementedException();
public IReadOnlyCollection<(string Id, string Json)> GetObjects(string[] ids) => throw new NotImplementedException();

public void SaveObject(string id, string json) => throw new NotImplementedException();

Expand Down
Loading