در اینجا با یک نمونه کارکرد مدل نگاشتبازهازش را نشان میدهیم. در این نمونه میخواهیم میانگین سن مردان و زنان یک کشور را بدست آوریم. این کار را بسیار نزدیک به چارچوب نگاشتبازهازش هدوپ با رابه[=استریم]های جاوا انجام میدهیم.
در این نمونه برای همانندسازی گرههای هدوپ (ماشینهای جداگانه) که نگاشت یا بازهازش در آنها انجام میگیرد از ریسمانها بهره میبریم؛ گویی هر ریسمان یک گره هدوپ است. ریسمانها را همزمان بکار میاندازیم و پراسو هریک بخشی از داده را پردازش میکنند.
درونداد[=ورودی]
دادههای هر شهر در فایلی به دیسار[=فرمت] 'شناسه، ژانه[=جنسیت]، زادروز' هستند:
100000, FEMALE, 1981-02-10
100001, MALE, 1973-02-08
100002, FEMALE, 1991-07-29
100003, MALE, 2006-07-26
100004, MALE, 1953-08-19
100005, FEMALE, 2012-07-08
. . .
. . .
کلاسهای نگهداشت و ترابرد داده
public enum Gender {
FEMALE, MALE
}
@Data
public class Person { // کلاس درونداد
private String id;
private Gender gender;
private LocalDate dob;
}
@Data
public class GenderAge { // کلاس کلید/ارزش میانجی
private Gender gender;
private Long age;
}
کلاس نگاشتگر
در نگرید به خت ۱۵
public class MapNode extends Thread {
private final String dataFile;
private List output;
public MapNode(String dataFile) {
this.dataFile = dataFile;
}
@Override
public void run() {
Stream fileStream = Files.lines(Path.of(dataFile));
Stream inStream = fileStream.map(s -> new Person(s.split(",")[0], Gender.valueOf(s.split(",")[1]), LocalDate.parse(s.split(",")[2])));
Stream outStream = inStream.
map(p -> new GenderAge(p.getGender(), (long) Period.between(p.getDob(), LocalDate.now()).getYears()));
output = outStream.collect(Collectors.toList());
}
public List getOutput() {
return output;
}
}
کلاس بازهازشگر
در نگرید به خت ۱۸
public class ReduceNode extends Thread {
private final Gender gender;
private List input;
private GenderAge sum;
public ReduceNode(Gender gender) {
this.gender = gender;
}
public void setInput(List inputStream) {
this.input = inputStream;
}
@Override
public void run() {
sum = input.stream().
reduce(new GenderAge(gender, 0l), (a, b) -> new GenderAge(a.getGender(), a.getAge() + b.getAge()));
}
public float getAverage() {
if (input.isEmpty()) {
return 0f;
}
return sum.getAge().floatValue() / input.size();
}
}
نگاشت
در گام نگاشت دادهی هر شهر که در فایل جداگانهای نوشته شده است خوراک یک نگاشتگر خواهد شد. برای این نیاز است که به شمار شهرها نگاشتگر بسازیم:
List mappers = new ArrayList(NUMBER_OF_CITIES);
for (int i = 0; i < NUMBER_OF_CITIES; i++) {
MapNode mapNode = new MapNode(String.format("City_%02d", i));
mappers.add(mapNode);
}
ExecutorService es = Executors.newCachedThreadPool();
for (MapNode mapper : mappers) {
es.execute(mapper);
}
es.shutdown();
while (!es.awaitTermination(60, TimeUnit.SECONDS));
پخشکردن (Shuffling)
در این فراروند، دادههایی که در گام پیش (گام نگاشت) به دیسهی میانجی در آمدهاند، برپایه کلیدشان جداسازی میشوند، دادههای با کلید همسان باهم یکجا میشوند و به بازهازشگر پیونددار[=مربوطه] فرستاده میشوند.
Map<Gender, List<GenderAge>> shuffled = new HashMap();
for (MapNode mapper : mappers) {
List<GenderAge> listF = new LinkedList();
List<GenderAge> listM = new LinkedList();
mapper.getOutput().stream().forEach(item -> ((item.getGender() == Gender.FEMALE) ? listF : listM).add(item));
shuffled.get(Gender.FEMALE).addAll(listF);
shuffled.get(Gender.MALE).addAll(listM);
}
بازهازش (Reduce)
برای هر کلید یک گره بازهازش ساخته و با دادههای گام پیش آنرا اجرا میکنیم
ReduceNode reduceNodeF = new ReduceNode(Gender.FEMALE);
ReduceNode reduceNodeM = new ReduceNode(Gender.MALE);
reduceNodeF.setInput(shuffled.get(Gender.FEMALE));
reduceNodeM.setInput(shuffled.get(Gender.MALE));
es = Executors.newCachedThreadPool();
es.execute(reduceNodeF);
es.execute(reduceNodeM);
es.shutdown();
while (!es.awaitTermination(5, TimeUnit.SECONDS));
برونداد
System.out.printf("Female Avg: %.1f\n", reduceNodeF.getAverage());
System.out.printf("Male Avg: %.1f\n", reduceNodeM.getAverage());