Skip to content

Commit

Permalink
Merge pull request #885 from abersnaze/observable-string-from
Browse files Browse the repository at this point in the history
Fixed an issue with the from(Reader) added a bunch of unit tests.
  • Loading branch information
benjchristensen committed Feb 18, 2014
2 parents ea9b73a + 7ffb0ef commit 89ff375
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.Objects;
import java.util.regex.Pattern;

import rx.Observable;
Expand All @@ -37,10 +38,28 @@
import rx.functions.Func2;

public class StringObservable {
/**
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
* {@link byte[]}s
*
* @param i
* Source {@link InputStream}
* @return
*/
public static Observable<byte[]> from(final InputStream i) {
return from(i, 8 * 1024);
}

/**
* Reads from the bytes from a source {@link InputStream} and outputs {@link Observable} of
* {@link byte[]}s
*
* @param i
* Source {@link InputStream}
* @param size
* internal buffer size
* @return
*/
public static Observable<byte[]> from(final InputStream i, final int size) {
return Observable.create(new OnSubscribe<byte[]>() {
@Override
Expand All @@ -65,10 +84,28 @@ public void call(Subscriber<? super byte[]> o) {
});
}

/**
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
* {@link String}s
*
* @param i
* Source {@link Reader}
* @return
*/
public static Observable<String> from(final Reader i) {
return from(i, 8 * 1024);
}

/**
* Reads from the characters from a source {@link Reader} and outputs {@link Observable} of
* {@link String}s
*
* @param i
* Source {@link Reader}
* @param size
* internal buffer size
* @return
*/
public static Observable<String> from(final Reader i, final int size) {
return Observable.create(new OnSubscribe<String>() {
@Override
Expand All @@ -80,7 +117,7 @@ public void call(Subscriber<? super String> o) {
int n = 0;
n = i.read(buffer);
while (n != -1 && !o.isUnsubscribed()) {
o.onNext(new String(buffer));
o.onNext(new String(buffer, 0, n));
n = i.read(buffer);
}
} catch (IOException e) {
Expand Down Expand Up @@ -119,7 +156,7 @@ public static Observable<String> decode(Observable<byte[]> src, Charset charset)

/**
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams
* and where handles when a multibyte character spans two chunks.
* and where it handles when a multibyte character spans two chunks.
* This method allows for more control over how malformed and unmappable characters are handled.
*
* @param src
Expand Down Expand Up @@ -151,6 +188,9 @@ public void onNext(byte[] bytes) {
}

public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
if (o.isUnsubscribed())
return false;

ByteBuffer bb;
if (last != null) {
if (next != null) {
Expand Down Expand Up @@ -270,8 +310,10 @@ public String call(String a, String b) {
/**
* Rechunks the strings based on a regex pattern and works on infinite stream.
*
* resplit(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
* resplit(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
* <pre>
* split(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
* split(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
* </pre>
*
* See {@link Pattern}
*
Expand Down Expand Up @@ -399,4 +441,56 @@ public void onNext(Object t) {
}
});
}

public final static class Line {
private final int number;
private final String text;

public Line(int number, String text) {
this.number = number;
this.text = text;
}

public int getNumber() {
return number;
}

public String getText() {
return text;
}

@Override
public int hashCode() {
return Objects.hash(number, text);
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Line))
return false;
return Objects.equals(number, ((Line) obj).number) && Objects.equals(text, ((Line) obj).text);
}

@Override
public String toString() {
return number + ":" + text;
}
}

/**
* Splits the {@link Observable} of Strings by lines and numbers them (zero based index)
*
* @param source
* @return
*/
public static Observable<Line> byLine(Observable<String> source) {
return split(source, System.getProperty("line.separator")).map(new Func1<String, Line>() {
int lineNumber = 0;

@Override
public Line call(String text) {
return new Line(lineNumber++, text);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.StringReader;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.MalformedInputException;
import java.util.Arrays;
import java.util.List;

import junit.framework.Assert;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.observables.StringObservable.Line;
import rx.observers.TestObserver;
import rx.util.AssertObservable;

Expand Down Expand Up @@ -221,4 +228,27 @@ public void testJoinThrows() {
verify(observer, never()).onCompleted();
verify(observer, times(1)).onError(any(Throwable.class));
}

@Test
public void testFromInputStream() {
final byte[] inBytes = "test".getBytes();
final byte[] outBytes = StringObservable.from(new ByteArrayInputStream(inBytes)).toBlockingObservable().single();
assertNotSame(inBytes, outBytes);
assertArrayEquals(inBytes, outBytes);
}

@Test
public void testFromReader() {
final String inStr = "test";
final String outStr = StringObservable.from(new StringReader(inStr)).toBlockingObservable().single();
assertNotSame(inStr, outStr);
assertEquals(inStr, outStr);
}

@Test
public void testByLine() {
List<Line> lines = StringObservable.byLine(Observable.from(Arrays.asList("qwer", "\nasdf\n", "zx", "cv"))).toList().toBlockingObservable().single();

assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
}
}

0 comments on commit 89ff375

Please sign in to comment.