Thursday 7 March 2013

Should a function handle its own threading?

G'day:
One of the UDFs I was triaging for CFLib was a function arrayDiff() which takes two arrays, and removes the elements from the second array from the first array.  For example if we called this code:

result = arrayDiff(["whero", "karaka", "kakariki"], ["karaka"]);

then result would be ["whero", "kakariki"].

This is a fairly useful sort of notion.


However what the function also did was to provide functionality to multi-thread the processing. One could pass in a number of threads to use, and the first array would be chunked into that many parts, and then each would have the elements from the second array removed, then the results recombined and returned. The thinking here was obviously that for large arrays, the removal process was quite slow (and it was), so simultaneously doing multiple chunks of the processing would speed the process as a whole up.

Considering the entire process, I can see why one might want to do this. However I think the approach taken was incorrect. I think a function should not concern itself with performance considerations that might arise due to the size of its inputs, it should just focus on the task at hand. I think it's the job of the calling code to deal with this sort of consideration. Basically a function called arrayDiff() should focus on diffing the arrays; and the calling code should busy itself with how arrayDiff() is called (be it in one thread of multiple threads).

Also let's have a look at the code. The version below is not the version submitted to CFLib, I've tidied it up, improved some of the logic, and fixed a coupla bugs with the first version:

array function arrayDiff(required array array1, required array array2, numeric threads=0, boolean removeDuplicates=true){
    var maxThreadLoop    = threads;
    var array1Len        = arrayLen(array1);

    if (threads > 0){
        var chunkSize        = ceiling(array1Len / maxThreadLoop);
        var threadName        = "";
        var threadNames        = [];
        var threadCount        = 0;
        var thisId            = "";
        for (threadCount=0; threadCount <= chunkSize; threadCount++){
            var startIdx    = threadCount * chunkSize;
            var endIdx        = min(array1Len, startIdx + chunkSize);
            threadName = createUuid();
            arrayAppend(threadNames, threadName);

            thread action="run" name=threadName startIdx=startIdx endIdx=endIdx array1=array1 array2=array2 {
                thread.chunk = duplicate(array1.subList(startIdx, endIdx));
                thread.chunk.removeAll(attributes.array2);
            }

            // If we're at the end of the array, break out of the outer loop rather than do empty cfthread calls
            if (endIdx == array1Len){
                break;
            }
        }
        // Join the threads after they have completed
        thread action="join" name=arrayToList(threadNames);

        var diffedArray = [];
        for (var threadName in threadNames){
            // Combine the arrays using java
            diffedArray.addAll(cfthread[threadName].chunk);
        }

    }else{
        // No threading
        array1.removeAll(array2);
        diffedArray = duplicate(array1);
    }

    // Remove any duplicates
    if (removeDuplicates){
        writeOutput("here<br />");
        diffedArray = createObject("java", "java.util.HashSet").init(diffedArray).toArray();
    }
    return diffedArray;
}

I've colour-coded this to highlight areas of the code:
  • the blue stuff is just basic skeleton of a function.
  • The green stuff is the bits that actually perform the array-diff functionality.
  • The red stuff - ie: the bulk of the function - is dealing with the threading.
  • The pink stuff is additional functionality tacked onto the function.
All up there's 48 lines of code there, and four of the lines of code actually deal with the functionality as described on the tin. 8%. If I was being less generous I'd discount the one in the middle of the threading stuff, cos that's only there because of the threading. So one could say 6%. To me that says something is wrong. 36 lines - 75% - of the function has nothing to do with what the function claims it does. Also note there's some additional functionality tacked on to the bottom of the function that really doesn't belong there. A function that removes items from an array is different from a function that dedupes said array.

What do you think? Am I on the right track, or am I on crack?

Anyway, I rejected the function as it stood.

However it got me thinking about the whole threading business, and decided there was merit in the idea of chunking data processing into threads. And also merit in the mooted arrayDiff() functionality. However I wanted to decouple them.

Yesterday morning before work I bashed this code out as a proof of concept:

public any function threadJob(required function onRun, numeric threads=1, any data, function onJoin){
    var threadNames = [];
    var threadName    = "";

    for (var i=1; i <= threads; i++){
        threadName = createUuid();
        arrayAppend(threadNames, threadName);
        thread action="run" name=threadName index=i threads=threads data=data onRun=onRun {
            onRun(data=data, threadMetadata={current=index,total=threads}, thread=thread);
        }
    }

    if (structKeyExists(arguments, "onJoin")){
        thread action="join" name=arrayToList(threadNames);
        var result = onJoin(data=cfthread, threadNames=threadNames);
    }

    if (structKeyExists(local, "result")){
        return result;
    }
}

This code does one thing: it multi-threads the processing of some data. It takes some data, and it takes two callbacks: one which is called in each thread, and one that is called when the threads are joined (the latter being optional). It also takes a number of threads to run.

Examples of the callbacks are:

function onRunHandler(required struct data, required struct threadMetadata, required struct thread){
    var array1Len    = arrayLen(data.a1);
    var chunkSize    = ceiling(array1Len / threadMetadata.total);
    var startIdx    = (index-1) * chunkSize;
    var endIdx        = min(array1Len, startIdx + chunkSize);
    if (startIdx <= endIdx){
        thread.result = duplicate(data.a1.subList(startIdx, endIdx));
        thread.result.removeAll(data.a2);
    }else{
        thread.result = [];
    }
}

function onJoinHandler(required struct data, required array threadNames){
    result = [];
    for (var threadName in threadNames){
        result.addAll(data[threadName].result);
    }
    return result;
}

And putting it together:

result = threadJob(onRun=onRunHandler, threads=1, data={a1=["whero", "karaka", "kakariki"], a2=["karaka"]}, onjoin=onJoinHandler);

This does the same thing as arrayDiff() did (except without the extraneous deduping code). Interestingly: it's about the same amount of code.

I think this approach is a nice separation of concerns, and accordingly the code is much easier to follow.

And now it's time to get back to my day job. As always: thoughts and opinions welcomed.

Cheers.

--
Adam