Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #93 from lofifnc/fix/parrallel_output
Browse files Browse the repository at this point in the history
Refine protokoll to indentify out of order messages
  • Loading branch information
Alexander Kolb authored Apr 25, 2019
2 parents 7bd63a0 + 7e97df8 commit 26ad23e
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 84 deletions.
42 changes: 0 additions & 42 deletions .circleci/config.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ private Action processMessage(byte[] bytes)
//--> register the index of the closed sink instance.
msg = new String(bytes, "UTF-8");
int sinkIndex = Integer.parseInt(msg.split(" ")[1]);
int countRecords = Integer.parseInt(msg.split(" ")[2]);
int numSubTasks = Integer.parseInt(msg.split(" ")[2]);
int countRecords = Integer.parseInt(msg.split(" ")[3]);
parallelism = numSubTasks;
expectedNumRecords += countRecords;
closedSinks.add(sinkIndex);
break;
Expand All @@ -213,16 +215,16 @@ private Action processMessage(byte[] bytes)
//check if all sink instances have been closed.

if(numRecords == expectedNumRecords) {
if (closedSinks.size() == parallelism) {
//finish the listening process
return Action.FINISH;

} else if(closedSinks.size() >= parallelism
if(closedSinks.size() == parallelism
&& expectedNumRecords == 0) {
//stream with no output will not open the sink
//so verifier has to be opened manually
verifier.init();
return Action.FINISH;
} else if (closedSinks.size() == parallelism) {
//finish the listening process
return Action.FINISH;

}
}
return Action.CONTINUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class OutputPublisher {
private static final EventTranslatorOneArg<OutputEvent, ByteBuffer> TRANSLATOR =
new ByteEventTranslator();
private final RingBuffer<OutputEvent> ringBuffer;
private int instance;
private final int instance;
private AtomicInteger msgCount = new AtomicInteger(0);
private Set<Integer> closed = new HashSet<Integer>();
public OutputPublisher(int instance, RingBuffer<OutputEvent> buffer) {
Expand Down Expand Up @@ -83,11 +83,11 @@ public void sendRecord(byte[] bytes) {
*
* @param taskNumber index of the subtask.
*/
public void sendClose(int taskNumber) {
public void sendClose(int taskNumber, int numTasks) {
// System.out.println("close taskNumber = " + taskNumber);
if (!closed.contains(taskNumber)) {
String close = String.format("CLOSE %d %d",
taskNumber, msgCount.get());
String close = String.format("CLOSE %d %d %d",
taskNumber, numTasks, msgCount.get());

queueMessage(close.getBytes());
closed.add(taskNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class OutputHandlerSpec extends CoreSpec {
sendString(publisher, "1")
sendString(publisher, "2")
sendString(publisher, "3")
publisher.send("CLOSE 0 3")
publisher.send("CLOSE 0 1 3")

listener.call() shouldBe ResultState.SUCCESS

Expand All @@ -60,7 +60,7 @@ class OutputHandlerSpec extends CoreSpec {
val listener = new OutputHandler[String](subscriber, disruptor, verifier, trigger)
disruptor.start()

publisher.send("CLOSE 0 0")
publisher.send("CLOSE 0 1 0")

listener.call() shouldBe ResultState.SUCCESS

Expand All @@ -80,11 +80,11 @@ class OutputHandlerSpec extends CoreSpec {
publisher.send(ser("OPEN 1 3 "))
publisher.send(ser("OPEN 2 3 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 3 1")
sendString(publisher, "2")
publisher.send("CLOSE 1 1")
publisher.send("CLOSE 1 3 1")
sendString(publisher, "3")
publisher.send("CLOSE 2 1")
publisher.send("CLOSE 2 3 1")

listener.call() shouldBe ResultState.SUCCESS

Expand All @@ -107,11 +107,11 @@ class OutputHandlerSpec extends CoreSpec {
publisher.send(ser("OPEN 0 3 "))
publisher.send(ser("OPEN 2 3 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 3 1")
sendString(publisher, "2")
publisher.send("CLOSE 1 1")
publisher.send("CLOSE 1 3 1")
sendString(publisher, "3")
publisher.send("CLOSE 2 1")
publisher.send("CLOSE 2 3 1")

listener.call() shouldBe ResultState.TRIGGERED

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class OutputPublisherSpec extends CoreSpec {
}

it should "send a close message" in new OutputPublisherCase {
publisher.sendClose(2)
publisher.sendClose(2, 2)
val record = subscriber.recv()
MessageType.getMessageType(record) shouldBe MessageType.CLOSE
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class RunnerSpec extends CoreSpec {
sendString(publisher, "1")
sendString(publisher, "2")
sendString(publisher, "3")
publisher.send("CLOSE 0 3")
publisher.send("CLOSE 0 1 3")

publisher.close()
}
Expand Down Expand Up @@ -86,11 +86,11 @@ class RunnerSpec extends CoreSpec {
publisher.send(ser("OPEN 1 3 "))
publisher.send(ser("OPEN 2 3 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 3 1")
sendString(publisher, "2")
publisher.send("CLOSE 1 1")
publisher.send("CLOSE 1 3 1")
sendString(publisher, "3")
publisher.send("CLOSE 2 1")
publisher.send("CLOSE 2 3 1")

publisher.close()
}
Expand Down Expand Up @@ -119,11 +119,11 @@ class RunnerSpec extends CoreSpec {
publisher.send(ser("OPEN 0 2 "))
publisher.send(ser("OPEN 1 2 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 3 1")
sendString(publisher, "2")
publisher.send("CLOSE 1 1")
publisher.send("CLOSE 1 3 1")
sendString(publisher, "3")
publisher.send("CLOSE 2 1")
publisher.send("CLOSE 2 3 1")

publisher.close()
}
Expand All @@ -149,7 +149,7 @@ class RunnerSpec extends CoreSpec {
publisher.send(ser("OPEN 0 2 "))
publisher.send(ser("OPEN 1 2 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 2 1")
sendString(publisher, "2")
sendString(publisher, "3")
Thread.sleep(1000)
Expand Down Expand Up @@ -180,7 +180,7 @@ class RunnerSpec extends CoreSpec {
publisher.send(ser("OPEN 0 2 "))
publisher.send(ser("OPEN 1 2 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 2 1")
sendString(publisher, "2")
sendString(publisher, "3")
Thread.sleep(2000)
Expand Down Expand Up @@ -219,7 +219,7 @@ class RunnerSpec extends CoreSpec {
publisher.send(ser("OPEN 0 2 "))
publisher.send(ser("OPEN 1 2 "))
sendString(publisher, "1")
publisher.send("CLOSE 0 1")
publisher.send("CLOSE 0 2 1")
sendString(publisher, "2")
sendString(publisher, "3")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ public void configure(Configuration configuration) {
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
public void open(int taskNumber, int numTasks) {
this.taskNumber = taskNumber;
this.numTasks = numTasks;
//open a socket to push data
handler = new OutputPublisher(instance, ringBuffer);
}

@Override
public void writeRecord(IN next) throws IOException {
public void writeRecord(IN next) {
byte[] msg;
if (serializer == null) {
//startWith serializer
Expand Down Expand Up @@ -93,6 +93,6 @@ public void writeRecord(IN next) throws IOException {
@Override
public void close() throws IOException {
//signal close to output receiver
handler.sendClose(taskNumber);
handler.sendClose(taskNumber, numTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.UnknownHostException;

/**
* Provides a sink that sends all incoming records using a 0MQ connection.
Expand All @@ -56,9 +56,7 @@ public TestSink(int instance, RingBuffer<OutputEvent> buffer) {


@Override
public void open(Configuration configuration) throws UnknownHostException {
String jobManagerAddress = configuration
.getString("jobmanager.rpc.address", "localhost");
public void open(Configuration configuration) {
//open a socket to push data
handler = new OutputPublisher(instance, buffer);
}
Expand All @@ -70,16 +68,15 @@ public void open(Configuration configuration) throws UnknownHostException {
* @param next incoming records
*/
@Override
public void invoke(IN next) {
public void invoke(IN next, SinkFunction.Context context) {

int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
byte[] msg;
final int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
final int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();

if (serializer == null) {

//startWith serializer
TypeInformation<IN> typeInfo = TypeExtractor.getForObject(next);
final TypeInformation<IN> typeInfo = TypeExtractor.getForObject(next);
serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
//push serializer to output receiver
try {
Expand Down Expand Up @@ -109,7 +106,8 @@ public void invoke(IN next) {
public void close() {
//signal close to output receiver
handler.sendClose(
getRuntimeContext().getIndexOfThisSubtask());
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());

}

Expand Down

0 comments on commit 26ad23e

Please sign in to comment.