I have a Pig program where I am trying to compute the minimum center between two bags. In order for it to work, I found I need to COGROUP the bags into a single dataset. The entire operation takes a long time. I want to either open one of the bags from disk within the UDF, or to be able to pass another relation into the UDF without needing to COGROUP......
Code:
# **** Load files for iteration ****
register myudfs.jar;
wordcounts = LOAD 'input/wordcounts.txt' USING PigStorage('\t') AS (PatentNumber:chararray, word:chararray, frequency:double);
centerassignments = load 'input/centerassignments/part-*' USING PigStorage('\t') AS (PatentNumber: chararray, oldCenter: chararray, newCenter: chararray);
kcenters = LOAD 'input/kcenters/part-*' USING PigStorage('\t') AS (CenterID:chararray, word:chararray, frequency:double);
kcentersa1 = CROSS centerassignments, kcenters;
kcentersa = FOREACH kcentersa1 GENERATE centerassignments::PatentNumber as PatentNumber, kcenters::CenterID as CenterID, kcenters::word as word, kcenters::frequency as frequency;
#***** Assign to nearest k-mean *******
assignpre1 = COGROUP wordcounts by PatentNumber, kcentersa by PatentNumber;
assignwork2 = FOREACH assignpre1 GENERATE group as PatentNumber, myudfs.kmeans(wordcounts, kcentersa) as CenterID;
basically my issue is that for each patent I need to pass the sub relations (wordcounts, kcenters). In order to do this, I do a cross and then a COGROUP by PatentNumber in order to get the set PatentNumber, {wordcounts}, {kcenters}. If I could figure a way to pass a relation or open up the centers from within the UDF, then I could just GROUP wordcounts by PatentNumber and run myudfs.kmeans(wordcount) which is hopefully much faster without the CROSS/COGROUP.
This is an expensive operation. Currently this takes about 20 minutes and appears to tack the CPU/RAM. I was thinking it might be more efficient without the CROSS. I'm not sure it will be faster, so I'd like to experiment.
Anyway it looks like calling the Loading functions from within Pig needs a PigContext object which I don't get from an evalfunc. And to use the hadoop file system, I need some initial objects as well, which I don't see how to get. So my question is how can I open a file from the hadoop file system from within a PIG UDF? I also run the UDF via main for debugging. So I need to load from the normal filesystem when in debug mode.
Another better idea would be if there was a way to pass a relation into a UDF without needing to CROSS/COGROUP. This would be ideal, particularly if the relation resides in memory.. ie being able to do myudfs.kmeans(wordcounts, kcenters) without needing the CROSS/COGROUP with kcenters...
But the basic idea is to trade IO for RAM/CPU cycles.
Anyway any help will be much appreciated, the PIG UDFs aren't super well documented beyond the most simple ones, even in the UDF manual.