Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support file write in binding-filesystem & binding-http-filesystem #1300

Merged
merged 13 commits into from
Nov 4, 2024
2 changes: 1 addition & 1 deletion runtime/binding-filesystem/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</licenses>

<properties>
<jacoco.coverage.ratio>0.81</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.80</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Objects;
import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
Expand All @@ -61,6 +60,7 @@
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.DataFW;
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.EndFW;
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.FileSystemBeginExFW;
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.FileSystemResetExFW;
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.binding.filesystem.internal.types.stream.WindowFW;
Expand All @@ -81,7 +81,13 @@ public final class FileSystemServerFactory implements FileSystemStreamFactory
private static final int READ_PAYLOAD_MASK = 1 << FileSystemCapabilities.READ_PAYLOAD.ordinal();
private static final int WRITE_PAYLOAD_MASK = 1 << FileSystemCapabilities.WRITE_PAYLOAD.ordinal();
private static final int CREATE_PAYLOAD_MASK = 1 << FileSystemCapabilities.CREATE_PAYLOAD.ordinal();
private static final int DELETE_PAYLOAD_MASK = 1 << FileSystemCapabilities.DELETE_PAYLOAD.ordinal();
private static final String DEFAULT_TAG = "";
private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
private static final String ERROR_PRECONDITION_FAILED = "Precondition Failed";
private static final String ERROR_CONFLICT = "Conflict";
private static final String ERROR_PRECONDITION_REQUIRED = "Precondition Required";
private static final String ERROR_NOT_FOUND = "Not Found";
private static final int TIMEOUT_EXPIRED_SIGNAL_ID = 0;
public static final int FILE_CHANGED_SIGNAL_ID = 1;
private static final int FLAG_FIN = 0x01;
Expand All @@ -105,6 +111,7 @@ public final class FileSystemServerFactory implements FileSystemStreamFactory
private final WindowFW.Builder windowRW = new WindowFW.Builder();

private final FileSystemBeginExFW.Builder beginExRW = new FileSystemBeginExFW.Builder();
private final FileSystemResetExFW.Builder resetExRW = new FileSystemResetExFW.Builder();
private final OctetsFW payloadRO = new OctetsFW();

private final Long2ObjectHashMap<FileSystemBindingConfig> bindings;
Expand Down Expand Up @@ -195,7 +202,7 @@ public MessageConsumer newStream(
final String tag = beginEx.tag().asString();
try
{
if (writeOrCreateOperation(capabilities))
if (writeOperation(capabilities))
{
String type = probeContentTypeOrDefault(path);
newStream = new FileSystemServerWriter(
Expand Down Expand Up @@ -635,7 +642,8 @@ private void doAppReset(
{
state = FileSystemState.closeInitial(state);

doReset(app, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, 0L);
doReset(app, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, 0L, EMPTY_EXTENSION);
}
}

Expand Down Expand Up @@ -770,7 +778,7 @@ private FileSystemServerWriter(
this.relativePath = relativePath;
this.resolvedPath = Paths.get(resolvedPath);
this.capabilities = capabilities;
this.tag = tag;
this.tag = DEFAULT_TAG.equals(tag) ? null : tag;
this.initialMax = decodeMax;
}

Expand Down Expand Up @@ -829,15 +837,50 @@ private void onAppBegin(

state = FileSystemState.openingInitial(state);

doAppWindow(traceId);
String error = null;

String currentTag = calculateTag();
if (!Objects.equals(currentTag, tag) ||
!(canCreatePayload(capabilities, resolvedPath) ||
canWritePayload(capabilities, resolvedPath)))
if ((capabilities & CREATE_PAYLOAD_MASK) != 0 && Files.exists(resolvedPath))
{
cleanup(traceId);
error = ERROR_CONFLICT;
}
else if ((capabilities & WRITE_PAYLOAD_MASK) != 0)
{
if (Files.notExists(resolvedPath))
{
error = ERROR_NOT_FOUND;
}
else if (tag == null)
{
error = ERROR_PRECONDITION_REQUIRED;
}
else if (!validateTag())
{
error = ERROR_PRECONDITION_FAILED;
}
}
else if ((capabilities & DELETE_PAYLOAD_MASK) != 0)
{
if (Files.notExists(resolvedPath))
{
error = ERROR_NOT_FOUND;

}
else if (tag != null && !validateTag())
{
error = ERROR_PRECONDITION_FAILED;
}
else
{
delete(traceId);
}
}

if (error != null)
{
doAppReset(traceId, error);
}

doAppWindow(traceId);
}

private void onAppData(
Expand Down Expand Up @@ -981,7 +1024,7 @@ private void onAppReset(

cleanupTmpFileIfExists();

doAppReset(traceId);
doAppReset(traceId, null);
}

private void doAppBegin(
Expand Down Expand Up @@ -1029,23 +1072,37 @@ private void doAppAbort(
}

private void doAppReset(
long traceId)
long traceId,
String error)
{
if (FileSystemState.initialOpening(state) && !FileSystemState.initialClosed(state))
{
state = FileSystemState.closeInitial(state);

doReset(app, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, 0L);
FileSystemResetExFW.Builder extension = resetExRW
.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(fileSystemTypeId);

if (error != null)
{
resetExRW.error(error);
}

doReset(app, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, 0L, extension.build());
}
}

private void doAppWindow(
long traceId)
{
state = FileSystemState.openInitial(state);
if (!FileSystemState.initialClosed(state))
{
state = FileSystemState.openInitial(state);

doWindow(app, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId,
0L, 0L, 0);
doWindow(app, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId,
0L, 0L, 0);
}
}

private void cleanup(
Expand All @@ -1054,7 +1111,7 @@ private void cleanup(
cleanupTmpFileIfExists();

doAppAbort(traceId);
doAppReset(traceId);
doAppReset(traceId, null);
}

private void cleanupTmpFileIfExists()
Expand Down Expand Up @@ -1098,6 +1155,11 @@ private String calculateTag()
return newTag;
}

private boolean validateTag()
{
return tag.equals(calculateTag());
}

private InputStream getInputStream()
{
InputStream input = null;
Expand Down Expand Up @@ -1133,6 +1195,22 @@ else if (canWritePayload(capabilities, resolvedPath))
}
return output;
}

private void delete(
long traceId)
{
try
{
Files.delete(resolvedPath);
doAppWindow(traceId);
doAppBegin(traceId, null);
doAppEnd(traceId);
}
catch (IOException ex)
{
cleanup(traceId);
}
}
}

private void doBegin(
Expand Down Expand Up @@ -1260,7 +1338,8 @@ private void doReset(
long acknowledge,
int maximum,
long traceId,
long authorization)
long authorization,
Flyweight extension)
{
final ResetFW reset = resetRW.wrap(writeBuffer, 0, writeBuffer.capacity())
.originId(originId)
Expand All @@ -1271,6 +1350,7 @@ private void doReset(
.maximum(maximum)
.traceId(traceId)
.authorization(authorization)
.extension(extension.buffer(), extension.offset(), extension.sizeof())
.build();

receiver.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
Expand Down Expand Up @@ -1325,9 +1405,18 @@ private boolean canCreatePayload(
return (capabilities & CREATE_PAYLOAD_MASK) != 0 && Files.notExists(path);
}

private boolean writeOrCreateOperation(
private boolean canDeletePayload(
int capabilities,
Path path)
{
return (capabilities & DELETE_PAYLOAD_MASK) != 0 && Files.exists(path);
}

private boolean writeOperation(
int capabilities)
{
return (capabilities & CREATE_PAYLOAD_MASK) != 0 || (capabilities & WRITE_PAYLOAD_MASK) != 0;
return (capabilities & CREATE_PAYLOAD_MASK) != 0 ||
(capabilities & WRITE_PAYLOAD_MASK) != 0 ||
(capabilities & DELETE_PAYLOAD_MASK) != 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ public void shouldReadFilePayloadModifiedMultiClient() throws Exception
@Test
@Configuration("server.yaml")
@Specification({
"${app}/read.file.payload.etag.not.matched/client"
"${app}/read.file.payload.tag.not.matched/client"
})
public void shouldReadFilePayloadEtagNotMatched() throws Exception
public void shouldReadFilePayloadTagNotMatched() throws Exception
{
k3po.finish();
}
Expand Down Expand Up @@ -321,4 +321,36 @@ public void shouldWriteFilePayloadInterrupt() throws Exception

k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${app}/delete.file.payload/client",
})
public void shouldDeleteFilePayload() throws Exception
{
Path targetDirectory = Paths.get("target/files").toAbsolutePath();
Path indexFile = targetDirectory.resolve("error.html");

Files.createDirectories(targetDirectory);

Files.write(indexFile, """
<html>
<head><title>Welcome</title></head>
<body>Hello, world</body>
</html>
""".getBytes());

k3po.finish();
}

@Test
@Configuration("server.yaml")
@Specification({
"${app}/delete.file.payload.failed/client",
})
public void shouldRejectDeleteFilePayload() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.http.filesystem.internal.config;

import static io.aklivity.zilla.runtime.binding.http.filesystem.internal.types.FileSystemCapabilities.CREATE_PAYLOAD;
import static io.aklivity.zilla.runtime.binding.http.filesystem.internal.types.FileSystemCapabilities.DELETE_PAYLOAD;
import static io.aklivity.zilla.runtime.binding.http.filesystem.internal.types.FileSystemCapabilities.READ_EXTENSION;
import static io.aklivity.zilla.runtime.binding.http.filesystem.internal.types.FileSystemCapabilities.READ_PAYLOAD;
import static io.aklivity.zilla.runtime.binding.http.filesystem.internal.types.FileSystemCapabilities.WRITE_PAYLOAD;
Expand All @@ -36,6 +37,7 @@ public final class HttpFileSystemWithResolver
private static final int HEADER_METHOD_MASK_GET = 1 << READ_PAYLOAD.ordinal() | 1 << READ_EXTENSION.ordinal();
public static final int HEADER_METHOD_MASK_POST = 1 << CREATE_PAYLOAD.ordinal();
public static final int HEADER_METHOD_MASK_PUT = 1 << WRITE_PAYLOAD.ordinal();
public static final int HEADER_METHOD_MASK_DELETE = 1 << DELETE_PAYLOAD.ordinal();

private static final Pattern PARAMS_PATTERN = Pattern.compile("\\$\\{params\\.([a-zA-Z_]+)\\}");
private static final Pattern PREFER_WAIT_PATTERN = Pattern.compile("wait=(\\d+)");
Expand All @@ -47,6 +49,7 @@ public final class HttpFileSystemWithResolver
private static final String16FW HEADER_METHOD_VALUE_HEAD = new String16FW("HEAD");
private static final String16FW HEADER_METHOD_VALUE_POST = new String16FW("POST");
private static final String16FW HEADER_METHOD_VALUE_PUT = new String16FW("PUT");
private static final String16FW HEADER_METHOD_VALUE_DELETE = new String16FW("DELETE");

private final String16FW etagRO = new String16FW();
private final HttpFileSystemWithConfig with;
Expand Down Expand Up @@ -101,20 +104,20 @@ else if (HEADER_METHOD_VALUE_POST.equals(method.value()))
else if (HEADER_METHOD_VALUE_PUT.equals(method.value()))
{
capabilities = HEADER_METHOD_MASK_PUT;
HttpHeaderFW ifMatched = httpBeginEx.headers().matchFirst(h -> HEADER_IF_MATCH_NAME.equals(h.name()));
if (ifMatched != null)
{
String16FW value = ifMatched.value();
etag = etagRO.wrap(value.buffer(), value.offset(), value.limit());
}
}
else if (HEADER_METHOD_VALUE_DELETE.equals(method.value()))
{
capabilities = HEADER_METHOD_MASK_DELETE;
}
}
HttpHeaderFW ifNotMatched = httpBeginEx.headers().matchFirst(h -> HEADER_IF_NONE_MATCH_NAME.equals(h.name()));
if (ifNotMatched != null)
HttpHeaderFW tag = httpBeginEx.headers().matchFirst(h ->
HEADER_IF_MATCH_NAME.equals(h.name()) || HEADER_IF_NONE_MATCH_NAME.equals(h.name()));
if (tag != null)
{
String16FW value = ifNotMatched.value();
String16FW value = tag.value();
etag = etagRO.wrap(value.buffer(), value.offset(), value.limit());
}

HttpHeaderFW prefer = httpBeginEx.headers().matchFirst(h -> HEADER_PREFER_NAME.equals(h.name()));
int wait = 0;
if (prefer != null)
Expand Down
Loading