Ну от нарешті в мене знайшлось трохи вільного часу, щоб представити вам реалізацію моделі Master/Worker для кластерних аплікацій, використовуючи Terracotta сервер та його інтеграційний модуль
tim-masterworker (це підмодуль модуля
tim-messaging).
Насамперед поставимо наступну задачу, для якої застосуємо модель Master/Worker, а також розподіл однієї великої роботи на кілька дрібніших і їх паралельне виконання на кількох машинах під управлінням Terracotta сервера.
Отже задача, нехай маємо перших N цілих чисел (N=2000000), користувач вводить число k, 0<=k<=N, потрібно знайти всі числа (їх кількість), які діляться на число k без остачі.
Зразу хочу звернути вашу увагу на те, що приклад зроблено не зовсім оптимально і так як би мало бути в ідеалі, оскільки я з Terracotta працюю не довше, ніж місяць, і хотів показати як працювати з Terracotta root-objects (шарені кластерні дані). Крім того, я буду використовувати міксовану техніку конфігурації: анотації + tc-config.xml.
Даний приклад не є шаблоном, але більша його частина може бути взята за основу для розробки власних рішень.
Я не буду детально описувати що таке Terracotta, root-object, instrumented-class, TIM та інші аспекти, оскільки про це все можна прочитати у відповідній літературі(в розділі документації на нашому форумі є посилання на
книжку по Terracotta, а про модель Master/Worker можна почитати в 11 розділі цієї
книги).
Розроблена аплікація буде працювати лише в кластерному режимі і вимагає, щоб був запущений Terracotta сервер, один Master та один Worker. В звичайному режимі дана аплікація працювати не буде.
Отже, повернемось до нашої задачі.
Для початку опишемо наш глобальний клас, який назвемо
ApplicationController - в цьому класі міститимуться зашарені кластерні поля та об'єкти, константи, зашарений кластерний lock-об'єкт та глобальні методи (все це документовано JavaDoc-ами та коментарями):
КОД
package com.devua.search.global;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.terracotta.modules.annotations.Root;
import com.devua.search.service.SearchService;
import com.devua.search.service.impl.SearchServiceImpl;
/**
* Contains all fields used by Master/Worker and global application methods.
*
* @author YDrozhdzhal
*
*/
public class ApplicationController {
/**
* Shared topology name between master and worker.
*/
public static final String TOPOLOGY_NAME = "SEARCH_MASTER_WORKER_TOPOLOGY";
/**
* Size of queue.
*/
public static final int QUEUE_SIZE = 1000;
/**
* Master node id hash code value.
*/
@Root
private int masterNodeIdHashCode;
/**
* Shared clustered lock.
*/
@Root
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/**
* This class is singleton and this variable store single instance of this class.
*/
private static ApplicationController instance;
/**
* Search service instance.
*/
private static SearchService<Integer, Integer> searchService = new SearchServiceImpl();
/**
* Search data size.
*/
public static final int SEARCH_DATA_SIZE = 2000000;
/**
* Data for searching.
*/
@Root
private List<Integer> dataToSearch;
/**
* Private constructor.
*/
private ApplicationController() {
readWriteLock.writeLock().lock();
if (dataToSearch == null) {
dataToSearch = new ArrayList<Integer>();
initDataToSearch();
}
readWriteLock.writeLock().unlock();
}
/**
* Obtains instance of <code>AppricationController</code> class.
*
* @return instance of <code>AppricationController</code> class.
*/
public static ApplicationController getInstance() {
if (instance == null) {
instance = new ApplicationController();
}
return instance;
}
/**
* Initializes data to search.
*/
private void initDataToSearch() {
if (dataToSearch.size() <= 0) {
for (int i = 0; i < SEARCH_DATA_SIZE; i++) {
dataToSearch.add(i + 1);
}
}
}
/**
* @return the masterNodeIdHashCode
*/
public int getMasterNodeIdHashCode() {
int nodeIdHashCode;
readWriteLock.readLock().lock();
nodeIdHashCode = masterNodeIdHashCode;
readWriteLock.readLock().unlock();
return nodeIdHashCode;
}
/**
* @param masterNodeId
* the masterNodeId to set
*/
public void setMasterNodeIdHashCode(Object masterNodeId) {
readWriteLock.writeLock().lock();
this.masterNodeIdHashCode = 0;
if (masterNodeId != null) {
this.masterNodeIdHashCode = masterNodeId.toString().hashCode();
}
readWriteLock.writeLock().unlock();
}
/**
* @return the dataToSearch
*/
public List<Integer> getDataToSearch() {
List<Integer> result;
readWriteLock.readLock().lock();
result = dataToSearch;
readWriteLock.readLock().unlock();
return result;
}
/**
* Determines if specified node id is master node.
*
* @param nodeId
* Specified node id.
* @return <code>true</code> if specified node id is master node.
*/
public boolean isMasterNode(Object nodeId) {
return nodeId.toString().hashCode() == masterNodeIdHashCode;
}
/**
* @return the searchService
*/
public static SearchService<Integer, Integer> getSearchService() {
return searchService;
}
}
Даний клас є singleton. Зверніть увагу на анотацію
@Root - це означає, що дане поле є root-object, або зашареним кластерним об'єктом.
Поле
readWriteLock - є шареним кластерним локом, і оскільки воно є типу
java.util.concurrent.locks.ReentrantReadWriteLock, то всі методи, які використовують його для блокування не повинні бути описані в Terracotta locks, оскільки Terracotta автоматично знає як блокувати методи, що використовують блокування даного типу.
Поле
QUEUE_SIZE - розмір черги Worker-а - максимальна кількість об'єктів Work, яка може бути у внутрішній черзі Worker-а.
Також слід звернути увагу на метод
initDataToSearch(), який ініціалізує колекцію даних, по якій буде здійснюватись пошук - даний метод проініціалізує колекцію даних в кластері лише раз, хоча буде викликатись стільки разів, скільки буде запущено аплікацій.
Методи, які містять в назві приставку
MasterNodeIdHashCode вткористовуються для автоматичного визначення чи нода є мастером, чи нода є воркером. Зразу скажу, що дане автоматичне визначення не є досконалим і інколи вимагає перезавантажити Terracotta сервер щоб якусь ноду зробити мастером. Краще аикористовувати явне присвоєння ролей - але не це я хотів показати - хто захоче, той реалізує таку штуку. Крім того дане автоматичне визначення передбачає, що буде кілька воркерів і лише один мастер, хоча насправді мастерів теж може бути кілька.
Наступним наведу
SearchService інтерфейс:
КОД
package com.devua.search.service;
import java.util.List;
/**
* Contains methods for search service.
*
* @author YDrozhdzhal
*
* @param <SC>
* Type of search criteria.
* @param <SRI>
* Type of search result item.
*/
public interface SearchService<SC, SRI> {
/**
* Searches data using specified search criteria and index range.
*
* @param searchCriteria
* Specified search criteria.
* @param startIndex
* Specified start index.
* @param endIndex
* Specified end index.
* @return matched items.
*/
List<SRI> search(SC searchCriteria, int startIndex, int endIndex);
/**
* Obtains search data size.
*
* @return search data size.
*/
int getSearchDataSize();
}
Далі його реалізація,
SearchServiceImpl:
КОД
package com.devua.search.service.impl;
import java.util.ArrayList;
import java.util.List;
import com.devua.search.global.ApplicationController;
import com.devua.search.service.SearchService;
/**
* Implements search service interface for numbers searching task.
*
* @author YDrozhdzhal
*
*/
public class SearchServiceImpl implements SearchService<Integer, Integer> {
@Override
public List<Integer> search(Integer searchCriteria, int startIndex, int endIndex) {
List<Integer> results = new ArrayList<Integer>();
List<Integer> dataToSearch = ApplicationController.getInstance().getDataToSearch().subList(startIndex,
endIndex + 1);
for (Integer dataItem : dataToSearch) {
if ((dataItem % searchCriteria) == 0) {
results.add(dataItem);
}
}
return results;
}
@Override
public int getSearchDataSize() {
return ApplicationController.SEARCH_DATA_SIZE;
}
}
Наступним буде опис класу
SearchWork, точніше його основної частини:
КОД
package com.devua.search.masterworker;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.devua.search.global.ApplicationController;
import com.devua.search.service.SearchService;
import commonj.work.Work;
/**
* Represents search work.
*
* @author YDrozhdzhal
*
* @param <SC>
* Type of search criteria.
* @param <SRI>
* Type of search result item.
*/
public class SearchWork<SC, SRI> implements Work {
private String searchWorkId;
private Integer workId;
private Integer startIndex;
private Integer endIndex;
private List<SRI> workResults;
private SC searchCriteria;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/**
* Default constructor.
*/
public SearchWork() {
}
/**
* Full constructor.
*
* @param searchWorkId
* @param workId
* @param startIndex
* @param endIndex
* @param searchCriteria
*/
public SearchWork(String searchWorkId, Integer workId, Integer startIndex, Integer endIndex, SC searchCriteria) {
super();
this.searchWorkId = searchWorkId;
this.workId = workId;
this.startIndex = startIndex;
this.endIndex = endIndex;
this.searchCriteria = searchCriteria;
}
...
public boolean isDaemon() {
return false;
}
public void release() {
}
/**
* Runs search work.
*/
@Override
@SuppressWarnings("unchecked")
public void run() {
SearchService<SC, SRI> searchService = (SearchService<SC, SRI>) ApplicationController.getSearchService();
List<SRI> results = searchService.search(searchCriteria, startIndex, endIndex);
if (results.size() > 0) {
readWriteLock.writeLock().lock();
workResults = results;
readWriteLock.writeLock().unlock();
}
}
}
Далі клас
SearchWorker:
КОД
package com.devua.search.masterworker;
import java.util.concurrent.Executors;
import org.terracotta.jmx.util.events.ClusterEvents;
import org.terracotta.masterworker.Worker;
import org.terracotta.message.pipe.BlockingQueueBasedPipe;
import org.terracotta.message.topology.DefaultTopology;
import org.terracotta.workmanager.dynamic.DynamicWorkerFactory;
import com.devua.search.global.ApplicationController;
/**
* Represents search worker.
*
* @author YDrozhdzhal
*
*/
@SuppressWarnings("unchecked")
public class SearchWorker implements Worker {
private Worker worker;
/**
* Default constructor.
*
* @param topologyName
* Name of topology.
* @throws Exception
*/
public SearchWorker(String topologyName) throws Exception {
// Registering cluster listener for master
ClusterEvents.DefaultListener clusterListener = new ClusterListener();
ClusterEvents.registerListener(clusterListener);
try {
clusterListener.waitForRegistration();
} catch (InterruptedException ex) {
throw new IllegalStateException("Interrupted by: " + ex);
}
DynamicWorkerFactory workerFactory = new DynamicWorkerFactory(topologyName, new DefaultTopology.Factory(new BlockingQueueBasedPipe.Factory(ApplicationController.QUEUE_SIZE)), Executors.newCachedThreadPool());
worker = workerFactory.create();
}
@Override
public void start() throws Exception {
worker.start();
}
@Override
public void stop() {
worker.stop();
}
/**
* Represents listener of cluster events for master.
*
* @author YDrozhdzhal
*
*/
private class ClusterListener extends ClusterEvents.DefaultListener {
@Override
public void nodeDisconnected(Object nodeId) {
if (ApplicationController.getInstance().isMasterNode(nodeId)) {
ApplicationController.getInstance().setMasterNodeIdHashCode(null);
}
}
}
}
В цьому класі я реєструю слухача кластерних подій та створюю воркера з
tim-masterworker. Зверніть увагу на наступний рядок коду:
КОД
DynamicWorkerFactory workerFactory = new DynamicWorkerFactory(topologyName, new DefaultTopology.Factory(new BlockingQueueBasedPipe.Factory(ApplicationController.QUEUE_SIZE)), Executors.newCachedThreadPool());
а точніше на
Executors.newCachedThreadPool() - це означає, що кожний об'єкт типу
SearchWork - буде оброблятись в окремому потоці - точніше черга воркера буде опрацьовуватись кількома потоками одночасно.
Тепер розглянемо клас
SearchMaster:
КОД
package com.devua.search.masterworker;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.terracotta.jmx.util.events.ClusterEvents;
import org.terracotta.message.pipe.BlockingQueueBasedPipe;
import org.terracotta.message.routing.LoadBalancingRouter;
import org.terracotta.message.topology.DefaultTopology;
import org.terracotta.modules.annotations.HonorTransient;
import org.terracotta.workmanager.dynamic.DynamicWorkManager;
import com.devua.search.global.ApplicationController;
import com.devua.search.service.SearchService;
import commonj.work.Work;
import commonj.work.WorkEvent;
import commonj.work.WorkItem;
import commonj.work.WorkListener;
import commonj.work.WorkManager;
/**
* Represents master for search process.
*
* @author YDrozhdzhal
*
* @param <SC>
* Type of search criteria.
* @param <SRI>
* Type of search result item.
*/
@HonorTransient
public class SearchMaster<SC, SRI> implements WorkListener {
/**
* Default work timeout.
*/
private static final int WORK_TIMEOUT = 120000;
// Error codes
private static final String ERROR_CODE_PREFIX = "Error ";
private static final String ERROR_CODE_SUFFIX = ": ";
private static final String SEARCH_ERROR_CODE_NO_CONNECTED_WORKERS = "SEC_NCV";
private static final String SEARCH_ERROR_CODE_TIMEOUT_ERROR = "SEC_TE";
private static final String SEARCH_ERROR_CODE_UKNOWN_ERROR = "SEC_UE";
private int workTimeout = WORK_TIMEOUT;
private WorkManager workManager;
private int amountOfWorkers;
private SearchService<SC, SRI> searchService;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Object nodeId;
/**
* Default constructor.
*
* @param topologyName
* Name of topology.
* @param searchService
* Specified search service.
*/
public SearchMaster(String topologyName, SearchService<SC, SRI> searchService) {
super();
DefaultTopology.Factory topologyFactory = new DefaultTopology.Factory(new BlockingQueueBasedPipe.Factory(ApplicationController.QUEUE_SIZE));
workManager = new DynamicWorkManager(topologyName, topologyFactory, new LoadBalancingRouter());
amountOfWorkers = 0;
this.searchService = searchService;
// Registering cluster listener for master
ClusterEvents.DefaultListener clusterListener = new ClusterListener();
ClusterEvents.registerListener(clusterListener);
try {
// Waiting for listener registration
clusterListener.waitForRegistration();
// Setting master node id
nodeId = clusterListener.getMyNodeId();
ApplicationController.getInstance().setMasterNodeIdHashCode(nodeId);
} catch (InterruptedException ex) {
throw new IllegalStateException("Interrupted by: " + ex);
}
}
/**
* Extended constructor.
*
* @param topologyName
* Name of topology.
* @param searchService
* Specified search service.
* @param workTimeout
* Specified timeout value in milliseconds for work part execution.
*/
public SearchMaster(String topologyName, SearchService<SC, SRI> searchService, int workTimeout) {
this(topologyName, searchService);
this.workTimeout = workTimeout;
}
/**
* Executes search work identified by search criteria.
*
* @param searchCriteria
* Specified search criteria.
* @return search results.
*/
@SuppressWarnings("unchecked")
public SearchResult<SC, SRI> executeSearchWork(SC searchCriteria) {
// If there are no connected workers then report error
if (amountOfWorkers == 0) {
SearchExecutionException error = new SearchExecutionException(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_NO_CONNECTED_WORKERS + ERROR_CODE_SUFFIX + "Can not schedule work because there are no connected workers.");
return new SearchResult<SC, SRI>(searchCriteria, null, error);
}
int dataSize = searchService.getSearchDataSize();
int intDataPart = dataSize / amountOfWorkers;
int leftDataPart = dataSize % amountOfWorkers;
List<WorkItem> items = new ArrayList<WorkItem>();
// if intDataPart == 0 then we need to send only one work part
if (intDataPart != 0) {
for (int i = 0; i < amountOfWorkers; i++) {
int startIndex = i * intDataPart;
int endIndex = (i + 1) * intDataPart - 1;
WorkItem item = scheduleSearchWork(new SearchWork<SC, SRI>(searchCriteria.toString(), i, startIndex, endIndex, searchCriteria));
items.add(item);
}
}
if (leftDataPart != 0) {
WorkItem item = scheduleSearchWork(new SearchWork<SC, SRI>(searchCriteria.toString(), amountOfWorkers + 1, amountOfWorkers * intDataPart, amountOfWorkers * intDataPart + leftDataPart - 1, searchCriteria));
items.add(item);
}
try {
boolean allCompletedSuccessfully = workManager.waitForAll(items, workTimeout);
// If all work parts completed successfully then build search result
if (allCompletedSuccessfully) {
List<SRI> results = new ArrayList<SRI>();
for (WorkItem workItem : items) {
SearchWork<SC, SRI> searchWork = (SearchWork<SC, SRI>) workItem.getResult();
if (searchWork.getWorkResults() != null && searchWork.getWorkResults().size() > 0) {
results.addAll(searchWork.getWorkResults());
}
}
return new SearchResult<SC, SRI>(searchCriteria, results, null);
}
// Not all work parts completed and additional handling is required
List<SRI> results = new ArrayList<SRI>();
int amountOfTimeoutedWorkParts = 0;
int amountOfRejectedWorkParts = 0;
for (WorkItem workItem : items) {
if (WorkEvent.WORK_COMPLETED == workItem.getStatus()) {
SearchWork<SC, SRI> searchWork = (SearchWork<SC, SRI>) workItem.getResult();
if (searchWork.getWorkResults() != null && searchWork.getWorkResults().size() > 0) {
results.addAll(searchWork.getWorkResults());
}
continue;
}
if (WorkEvent.WORK_REJECTED == workItem.getStatus()) {
amountOfRejectedWorkParts++;
continue;
}
amountOfTimeoutedWorkParts++;
}
StringBuilder errorMessageBuilder = new StringBuilder(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_TIMEOUT_ERROR + ERROR_CODE_SUFFIX + "Search completed and some results are returned but some errors occured in search process.");
errorMessageBuilder.append(" Amount of timeouted work parts = " + amountOfTimeoutedWorkParts);
errorMessageBuilder.append(".");
errorMessageBuilder.append(" Amount of rejected work parts = " + amountOfRejectedWorkParts);
return new SearchResult<SC, SRI>(searchCriteria, results, new SearchExecutionException(errorMessageBuilder.toString()));
} catch (IllegalArgumentException e) {
return new SearchResult<SC, SRI>(searchCriteria, null, new SearchExecutionException(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_UKNOWN_ERROR + ERROR_CODE_SUFFIX + "Work execution failed.", e));
} catch (InterruptedException e) {
return new SearchResult<SC, SRI>(searchCriteria, null, new SearchExecutionException(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_UKNOWN_ERROR + ERROR_CODE_SUFFIX + "Work execution failed.", e));
}
}
/**
* Schedules work.
*
* @param work
* Work for schedule.
* @return scheduled <code>WorkItem</code> instance.
*/
private WorkItem scheduleSearchWork(Work work) {
return workManager.schedule(work, this);
}
/**
* Handler for WORK_ACCEPTED event.
*/
public void workAccepted(WorkEvent we) {
// Do nothing. You can insert here your debug code or work event handler
}
/**
* Handler for WORK_COMPLETED event.
*/
public void workCompleted(WorkEvent we) {
// Do nothing. You can insert here your debug code or work event handler
}
/**
* Handler for WORK_REJECTED event.
*/
public void workRejected(WorkEvent we) {
// Do nothing. You can insert here your debug code or work event handler
}
/**
* Handler for WORK_STARTED event.
*/
public void workStarted(WorkEvent we) {
// Do nothing. You can insert here your debug code or work event handler
}
/**
* Represents listener of cluster events for master.
*
* @author YDrozhdzhal
*
*/
private class ClusterListener extends ClusterEvents.DefaultListener {
@Override
public void initialClusterMembers(Object[] nodeIds) {
for (Object nodeId : nodeIds) {
if (!isMasterNode(nodeId.toString())) {
readWriteLock.writeLock().lock();
// Increasing amount of workers
amountOfWorkers++;
readWriteLock.writeLock().unlock();
}
}
}
@Override
public void nodeConnected(Object nodeId) {
if (!isMasterNode(nodeId)) {
readWriteLock.writeLock().lock();
// Increasing amount of workers
amountOfWorkers++;
readWriteLock.writeLock().unlock();
}
}
@Override
public void nodeDisconnected(Object nodeId) {
if (!isMasterNode(nodeId)) {
readWriteLock.writeLock().lock();
// Decreasing amount of workers
amountOfWorkers--;
readWriteLock.writeLock().unlock();
}
}
@Override
public void thisNodeConnected(Object nodeId) {
ApplicationController.getInstance().setMasterNodeIdHashCode(nodeId);
}
@Override
public void thisNodeDisconnected(Object nodeId) {
ApplicationController.getInstance().setMasterNodeIdHashCode(null);
}
/**
* Determines if specified node id is master node.
*
* @param nodeId
* Specified node id.
* @return <code>true</code> if specified node id is master node.
*/
private boolean isMasterNode(Object nodeId) {
return nodeId.equals(getMyNodeId());
}
}
}
Основна логіка розподілу роботи на кілька частин є зашитою в методі
public SearchResult<SC, SRI> executeSearchWork(SC searchCriteria).
Тепер розглянемо класи, які використовуються для запуску аплікації. Для того, щоб показати, що в мастер можна кидати кілька робіт одночасно я зробив наступну річ:
користувач вводить число, а аплікація розв'язує поставлену задачу одразу для 5-х чисел, які вибираються на основі введеного користувачем числа. Якщо детальніше - якщо користувач ввів число a, то пошук здійснюється для чисел: a, a + 1, a + 2, a + 3, a + 4.Отже, клас
SearchTask, який відповідає за розв'язок однієї задачі:
КОД
import com.devua.search.masterworker.SearchMaster;
import com.devua.search.masterworker.SearchResult;
/**
* Represents search task.
*
* @author YDrozhdzhal
*
* @param <SC>
* Type of search criteria.
* @param <SRI>
* Type of search result item
*/
public class SearchTask<SC, SRI> implements Runnable {
private SC searchCriteria;
private SearchMaster<SC, SRI> searchMaster;
/**
* Default constructor.
*
* @param searchCriteria
* Search criteria.
* @param searchMaster
* Search master.
*/
public SearchTask(SC searchCriteria, SearchMaster<SC, SRI> searchMaster) {
super();
this.searchCriteria = searchCriteria;
this.searchMaster = searchMaster;
}
@Override
public void run() {
SearchResult<SC, SRI> result = searchMaster.executeSearchWork(searchCriteria);
if (result.getResults() == null) {
if (result.hasError()) {
System.err.println("Error occured when searching: " + result.getError().getMessage());
return;
}
System.err.println("Unknown error occured when searching.");
return;
}
if (result.hasError()) {
System.out.println("-----------------------------------------------------------------------------------------------"
+ "\nSearch completed with error:" + result.getError().getMessage() + " But some results are available.");
}
printResults(result);
}
/**
* Prints search result.
*
* @param result
* Results for printing.
*/
private void printResults(SearchResult<SC, SRI> result) {
System.out.println("-----------------------------------------------------------------------------------------------"
+ "\nAmount of collected results for search criteria " + result.getSearchCriteria() + ": "
+ result.getResults().size() + "\n" + "-----------------------------------------------------------------------------------------------");
}
}
Останнє, що залишилось, це конфігураційний файл
tc-config.xml:
КОД
<?xml version="1.0" encoding="UTF-8"?>
<tc:tc-config xsi:schemaLocation="http://www.terracotta.org/config http://www.terracotta.org/schema/terracotta-4.xsd"
xmlns:tc="http://www.terracotta.org/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<servers>
<server name="localhost" host="localhost">
<dso-port>9510</dso-port>
<jmx-port>9520</jmx-port>
<data>target/terracotta/server/data</data>
<logs>target/terracotta/server/logs</logs>
<statistics>target/terracotta/server/statistics</statistics>
</server>
<update-check>
<enabled>true</enabled>
</update-check>
</servers>
<!--*********************************************-->
<!--* Any app-level config changes made must be *-->
<!--* be mirrored in tc-config-prod.xml as well *-->
<!--*********************************************-->
<clients>
<logs>target/terracotta/clients/logs/%(tc.nodeName)</logs>
<statistics>target/terracotta/clients/statistics/%(tc.nodeName)</statistics>
<modules>
<module name="tim-annotations" version="1.2.3" />
<module name="tim-masterworker" version="1.2.1" />
</modules>
</clients>
<application>
<dso>
<instrumented-classes>
<include>
<class-expression>com.devua.search.global.ApplicationController</class-expression>
</include>
<include>
<class-expression>com.devua.search.service.impl.SearchServiceImpl</class-expression>
</include>
</instrumented-classes>
<transient-fields>
<field-name>com.devua.search.masterworker.SearchMaster.workManager</field-name>
</transient-fields>
</dso>
</application>
</tc:tc-config>
Оскільки ми використовуємо анотації та
tim-masterworker, то описуємо дані модулі в секції
<modules>.
В секцію інструментованих класів додаємо наш
ApplicationController та
SearchServiceImpl.
Ну і нарешті клас, який використовується для запуску аплікації,
StartApp:
КОД
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.devua.search.global.ApplicationController;
import com.devua.search.masterworker.SearchMaster;
import com.devua.search.masterworker.SearchWorker;
/**
* Starts application.
*
* @author YDrozhdzhal
*
*/
public class StartApp {
private static final int SIZE = 5;
private SearchMaster<Integer, Integer> searchMaster;
/**
* @param searchMaster
* the searchMaster to set
*/
public void setSearchMaster(SearchMaster<Integer, Integer> searchMaster) {
this.searchMaster = searchMaster;
}
public boolean readDataAndSchedule() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Integer searchCriteria;
try {
System.out.println("Enter a search criteria number:");
String sc = reader.readLine();
try {
searchCriteria = new Integer(Integer.parseInt(sc));
} catch (Exception e) {
return false;
}
} catch (IOException e) {
return false;
}
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < SIZE; i++) {
executorService.execute(new SearchTask<Integer, Integer>(searchCriteria + i, searchMaster));
}
executorService.shutdown();
return true;
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if (ApplicationController.getInstance().getMasterNodeIdHashCode() == 0) {
StartApp app = new StartApp();
SearchMaster<Integer, Integer> sm = new SearchMaster<Integer, Integer>(ApplicationController.TOPOLOGY_NAME,
ApplicationController.getSearchService());
app.setSearchMaster(sm);
while (app.readDataAndSchedule()) {
}
} else {
SearchWorker searchWorker = new SearchWorker(ApplicationController.TOPOLOGY_NAME);
searchWorker.start();
}
}
}
Зверніть увагу, що і мастер і воркер повинні створюватись використовуючи однакові топології.Увесь приклад прикріплено до цього поста і його можна завантажити.
Для того, щоб його запустити Вам потрібно буде:
- з'єднання з інтернетом.
- коректно встановити Apache Maven 2.0.9, JDK 1.6 (я запускав на Update 12), Terracotta сервер і встановити всі відповідні системні змінні:
TC_INSTALL_DIR та
JAVA_HOME.
- збілдати аплікацію використовуючи файлик
build.bat.
- запустити файлик
required-tim-install.bat - який завантажить та проінсталює потрібні модулі в Terracotta сервер.
- запустити Terracotta сервер, використовуючи файлик
startServer.bat (
консоль номер 1).
- запустити Terracotta клієнт - нашу аплікацію, використовуючи файлик
startClient.bat (
консоль номер 2).
- почекати коли клієнт запуститься і стане мастером.
- запустити ще кілька Terracotta клієнтів, використовуючи файлик
startClient.bat (
консолі номер 3, 4 і т. д.)
- в
консолі номер 2 вводимо число і чекаємо на результати (кількість знайдених чисел для введеного числа), які будуть показані в
консолі номер 2.
- в консолях 3, 4 і т. д. ви можете бачити як розподілялись частинки роботи між воркерами, як вони приймались та оброблялись.