Sunday, May 2, 2021

Multithreading for experienced Professional

Problem 1.) I have a large text file, in this case it's roughly 4.5 GB, and I need to process the entire file as fast as is possible. 


When the data gets to output, it either needs to be sorted into the correct order, or it needs to already be in the correct order.

It is better to do it with ordered task:

class OrderedTask implements Comparable<OrderedTask> {

    private final Integer index;
    private final String line;

    public OrderedTask(Integer index, String line) {
        this.index = index;
        this.line = line;
    }


    @Override
    public int compareTo(OrderedTask o) {
        return index < o.getIndex() ? -1 : index == o.getIndex() ? 0 : 1;
    }

    public Integer getIndex() {
        return index;
    }

    public String getLine() {
        return line;
    }    
}

As an output queue you can use your own backed by priority queue:

class OrderedTaskQueue {

    private final ReentrantLock lock;
    private final Condition waitForOrderedItem;
    private final int maxQueuesize;
    private final PriorityQueue<OrderedTask> backedQueue;

    private int expectedIndex;

    public OrderedTaskQueue(int maxQueueSize, int startIndex) {
        this.maxQueuesize = maxQueueSize;
        this.expectedIndex = startIndex;
        this.backedQueue = new PriorityQueue<>(2 * this.maxQueuesize);

        this.lock = new ReentrantLock();
        this.waitForOrderedItem = this.lock.newCondition();
    }


    public boolean put(OrderedTask item) {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.size() >= maxQueuesize && item.getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }

            boolean result = this.backedQueue.add(item);
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }


    public OrderedTask take() {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.peek() == null || this.backedQueue.peek().getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }
            OrderedTask result = this.backedQueue.poll();
            expectedIndex++;
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }
}

StartIndex is the index of the first ordered task, and maxQueueSize is used to stop processing of other tasks (not to fill the memory), when we wait for some earlier task to finish. It should be double/tripple of the number of processing thread, to not stop the processing immediatelly and allow the scalability.

Then you should create your task :

int indexOrder =0;
            while ((line = reader.readLine()) != null) {
                inputQueue.put(new OrderedTask(indexOrder++,line);                    

            }

The line by line is only used because of your example. You should change the OrderedTask to support the batch of lines.

Main thread:

static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;

private void initialise() throws IOException {
    BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
    BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million. 

    String inputFileName = "test.txt";
    String outputFileName = "outputTest.txt";

    BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
    BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));


    Thread T1 = new Thread(new Input(reader, inputQueue));
    Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
    Thread T3 = new Thread(new Output(writer, outputQueue));

    T1.start();
    T2.start();
    T3.start();

    while (!writerFinished) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    reader.close();
    writer.close();

    System.out.println("Exited.");
}

Input thread: (Please forgive the commented debug code, was using it to ensure the reader thread was actually executing properly).

class Input implements Runnable {
    BufferedReader reader;
    BlockingQueue<String> inputQueue;

    Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
        this.reader = reader;
        this.inputQueue = inputQueue;
    }

    @Override
    public void run() {
        String poisonPill = "ChH92PU2KYkZUBR";
        String line;
        //int linesRead = 0;

        try {
            while ((line = reader.readLine()) != null) {
                inputQueue.put(line);
                //linesRead++;

                /*
                if (linesRead == 500_000) {
                    //batchesRead += 1;
                    //System.out.println("Batch read");
                    linesRead = 0;
                }
                */
            }

            inputQueue.put(poisonPill);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

        readerFinished = true;

    }
}

Processing thread: (Normally this would actually be doing something to the line, but for purposes of the mockup I've just made it immediately push to the output thread). If necessary we can simulate it doing some work by making the thread sleep for a small amount of time for each line.

class Processing implements Runnable {
    BlockingQueue<String> inputQueue;
    BlockingQueue<String> outputQueue;

    Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
        this.inputQueue = inputQueue;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (inputQueue.isEmpty() && readerFinished) {
                    break;
                }

                String line = inputQueue.take();
                outputQueue.put(line);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Output thread:

class Output implements Runnable {
    BufferedWriter writer;
    BlockingQueue<String> outputQueue;

    Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
        this.writer = writer;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        String line;
        ArrayList<String> outputList = new ArrayList<>();

        while (true) {
            try {
                line = outputQueue.take();

                if (line.equals("ChH92PU2KYkZUBR")) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer finished - executing termination");

                    writerFinished = true;
                    break;
                }

                line += "\n";
                outputList.add(line);

                if (outputList.size() == 500_000) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer wrote batch");
                    outputList = new ArrayList<>();
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


No comments:

Post a Comment

Top DataStructures Problem from Medium-2

  Array: Find a pair with the given sum in an array Maximum Sum Subarray Problem (Kadane’s Algorithm) Longest Increasing Subsequence Problem...