Help - Search - Members - Calendar
Full Version: Terracotta
DEV UA > Java > Бібліотеки та інструменти від сторонніх розробників
TIGER
Привіт.

Довелось стикнутись з такою бібліотекою як Terracotta.

Може хтось мав із нею справу або маєте якусь документацію на москальській або українській(хе-хе smile.gif ) мові?
professor
Звідки в мене дике відчуття, що ти будеш першим?
Sulamith Lusor
гарна назва )
художня )))
TIGER
Треба застосувати Master/Worker pattern, використовуючи кластерінг за допомогою Terracotta та її модуля tim-messaging (кластеризувати певну функціональність). По самому tim-messaging документації не дуже багато. Може хтось з таким стикався?
TIGER
Наразі я то трохи порозгрібав, принаймні Master/Worker pattern, його tim-messaging і написав його чорнову імплементацію. Протягом наступного тижня буду її вдосконалювати і, можливо, на наступні вихідні викладу сурс код з коментарями, поясненнями та інструкціями.
Pegasus
книжка на Гуглі про Terracotta.
TIGER
Для початку вона згодиться, правда дядьки з Terracotta форуму пишуть що та модель, Master/Worker яка є в книжці вже давно є "deprecated". Я використав їхній TIM tim-messaging, який має новішу реалізацію моделі.
TIGER
Ну от нарешті в мене знайшлось трохи вільного часу, щоб представити вам реалізацію моделі Master/Worker для кластерних аплікацій, використовуючи Terracotta сервер та його інтеграційний модуль tim-masterworker (це підмодуль модуля tim-messaging).

Насамперед поставимо наступну задачу, для якої застосуємо модель Master/Worker, а також розподіл однієї великої роботи на кілька дрібніших і їх паралельне виконання на кількох машинах під управлінням Terracotta сервера.

Отже задача, нехай маємо перших N цілих чисел (N=2000000), користувач вводить число k, 0<=k<=N, потрібно знайти всі числа (їх кількість), які діляться на число k без остачі.

Зразу хочу звернути вашу увагу на те, що приклад зроблено не зовсім оптимально і так як би мало бути в ідеалі, оскільки я з Terracotta працюю не довше, ніж місяць, і хотів показати як працювати з Terracotta root-objects (шарені кластерні дані). Крім того, я буду використовувати міксовану техніку конфігурації: анотації + tc-config.xml.

Даний приклад не є шаблоном, але більша його частина може бути взята за основу для розробки власних рішень.

Я не буду детально описувати що таке Terracotta, root-object, instrumented-class, TIM та інші аспекти, оскільки про це все можна прочитати у відповідній літературі(в розділі документації на нашому форумі є посилання на книжку по Terracotta, а про модель Master/Worker можна почитати в 11 розділі цієї книги).

Розроблена аплікація буде працювати лише в кластерному режимі і вимагає, щоб був запущений Terracotta сервер, один Master та один Worker. В звичайному режимі дана аплікація працювати не буде.

Отже, повернемось до нашої задачі.

Для початку опишемо наш глобальний клас, який назвемо ApplicationController - в цьому класі міститимуться зашарені кластерні поля та об'єкти, константи, зашарений кластерний lock-об'єкт та глобальні методи (все це документовано JavaDoc-ами та коментарями):

КОД
package com.devua.search.global;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.terracotta.modules.annotations.Root;

import com.devua.search.service.SearchService;
import com.devua.search.service.impl.SearchServiceImpl;

/**
* Contains all fields used by Master/Worker and global application methods.
*
* @author YDrozhdzhal
*
*/
public class ApplicationController {

    /**
     * Shared topology name between master and worker.
     */
    public static final String TOPOLOGY_NAME = "SEARCH_MASTER_WORKER_TOPOLOGY";

    /**
     * Size of queue.
     */
    public static final int QUEUE_SIZE = 1000;

    /**
     * Master node id hash code value.
     */
    @Root
    private int masterNodeIdHashCode;

    /**
     * Shared clustered lock.
     */
    @Root
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * This class is singleton and this variable store single instance of this class.
     */
    private static ApplicationController instance;

    /**
     * Search service instance.
     */
    private static SearchService<Integer, Integer> searchService = new SearchServiceImpl();

    /**
     * Search data size.
     */
    public static final int SEARCH_DATA_SIZE = 2000000;

    /**
     * Data for searching.
     */
    @Root
    private List<Integer> dataToSearch;

    /**
     * Private constructor.
     */
    private ApplicationController() {
        readWriteLock.writeLock().lock();

        if (dataToSearch == null) {
            dataToSearch = new ArrayList<Integer>();

            initDataToSearch();
        }

        readWriteLock.writeLock().unlock();
    }

    /**
     * Obtains instance of <code>AppricationController</code> class.
     *
     * @return instance of <code>AppricationController</code> class.
     */
    public static ApplicationController getInstance() {
        if (instance == null) {
            instance = new ApplicationController();
        }

        return instance;
    }

    /**
     * Initializes data to search.
     */
    private void initDataToSearch() {
        if (dataToSearch.size() <= 0) {

            for (int i = 0; i < SEARCH_DATA_SIZE; i++) {
                dataToSearch.add(i + 1);
            }

        }
    }

    /**
     * @return the masterNodeIdHashCode
     */
    public int getMasterNodeIdHashCode() {
        int nodeIdHashCode;

        readWriteLock.readLock().lock();
        nodeIdHashCode = masterNodeIdHashCode;
        readWriteLock.readLock().unlock();

        return nodeIdHashCode;
    }

    /**
     * @param masterNodeId
     *                the masterNodeId to set
     */
    public void setMasterNodeIdHashCode(Object masterNodeId) {
        readWriteLock.writeLock().lock();

        this.masterNodeIdHashCode = 0;

        if (masterNodeId != null) {
            this.masterNodeIdHashCode = masterNodeId.toString().hashCode();
        }

        readWriteLock.writeLock().unlock();
    }

    /**
     * @return the dataToSearch
     */
    public List<Integer> getDataToSearch() {
        List<Integer> result;

        readWriteLock.readLock().lock();
        result = dataToSearch;
        readWriteLock.readLock().unlock();

        return result;
    }

    /**
     * Determines if specified node id is master node.
     *
     * @param nodeId
     *                Specified node id.
     * @return <code>true</code> if specified node id is master node.
     */
    public boolean isMasterNode(Object nodeId) {
        return nodeId.toString().hashCode() == masterNodeIdHashCode;
    }

    /**
     * @return the searchService
     */
    public static SearchService<Integer, Integer> getSearchService() {
        return searchService;
    }

}


Даний клас є singleton. Зверніть увагу на анотацію @Root - це означає, що дане поле є root-object, або зашареним кластерним об'єктом.

Поле readWriteLock - є шареним кластерним локом, і оскільки воно є типу java.util.concurrent.locks.ReentrantReadWriteLock, то всі методи, які використовують його для блокування не повинні бути описані в Terracotta locks, оскільки Terracotta автоматично знає як блокувати методи, що використовують блокування даного типу.

Поле QUEUE_SIZE - розмір черги Worker-а - максимальна кількість об'єктів Work, яка може бути у внутрішній черзі Worker-а.
Також слід звернути увагу на метод initDataToSearch(), який ініціалізує колекцію даних, по якій буде здійснюватись пошук - даний метод проініціалізує колекцію даних в кластері лише раз, хоча буде викликатись стільки разів, скільки буде запущено аплікацій.

Методи, які містять в назві приставку MasterNodeIdHashCode вткористовуються для автоматичного визначення чи нода є мастером, чи нода є воркером. Зразу скажу, що дане автоматичне визначення не є досконалим і інколи вимагає перезавантажити Terracotta сервер щоб якусь ноду зробити мастером. Краще аикористовувати явне присвоєння ролей - але не це я хотів показати - хто захоче, той реалізує таку штуку. Крім того дане автоматичне визначення передбачає, що буде кілька воркерів і лише один мастер, хоча насправді мастерів теж може бути кілька.

Наступним наведу SearchService інтерфейс:

КОД
package com.devua.search.service;

import java.util.List;

/**
* Contains methods for search service.
*
* @author YDrozhdzhal
*
* @param <SC>
*                Type of search criteria.
* @param <SRI>
*                Type of search result item.
*/
public interface SearchService<SC, SRI> {

    /**
     * Searches data using specified search criteria and index range.
     *
     * @param searchCriteria
     *                Specified search criteria.
     * @param startIndex
     *                Specified start index.
     * @param endIndex
     *                Specified end index.
     * @return matched items.
     */
    List<SRI> search(SC searchCriteria, int startIndex, int endIndex);

    /**
     * Obtains search data size.
     *
     * @return search data size.
     */
    int getSearchDataSize();

}


Далі його реалізація, SearchServiceImpl:

КОД
package com.devua.search.service.impl;

import java.util.ArrayList;
import java.util.List;

import com.devua.search.global.ApplicationController;
import com.devua.search.service.SearchService;

/**
* Implements search service interface for numbers searching task.
*
* @author YDrozhdzhal
*
*/
public class SearchServiceImpl implements SearchService<Integer, Integer> {

    @Override
    public List<Integer> search(Integer searchCriteria, int startIndex, int endIndex) {
        List<Integer> results = new ArrayList<Integer>();

        List<Integer> dataToSearch = ApplicationController.getInstance().getDataToSearch().subList(startIndex,
                endIndex + 1);

        for (Integer dataItem : dataToSearch) {
            if ((dataItem % searchCriteria) == 0) {
                results.add(dataItem);
            }
        }

        return results;
    }

    @Override
    public int getSearchDataSize() {
        return ApplicationController.SEARCH_DATA_SIZE;
    }

}


Наступним буде опис класу SearchWork, точніше його основної частини:

КОД
package com.devua.search.masterworker;

import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.devua.search.global.ApplicationController;
import com.devua.search.service.SearchService;
import commonj.work.Work;

/**
* Represents search work.
*
* @author YDrozhdzhal
*
* @param <SC>
*                Type of search criteria.
* @param <SRI>
*                Type of search result item.
*/
public class SearchWork<SC, SRI> implements Work {

    private String searchWorkId;
    private Integer workId;
    private Integer startIndex;
    private Integer endIndex;
    private List<SRI> workResults;
    private SC searchCriteria;
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * Default constructor.
     */
    public SearchWork() {
    }

    /**
     * Full constructor.
     *
     * @param searchWorkId
     * @param workId
     * @param startIndex
     * @param endIndex
     * @param searchCriteria
     */
    public SearchWork(String searchWorkId, Integer workId, Integer startIndex, Integer endIndex, SC searchCriteria) {
        super();
        this.searchWorkId = searchWorkId;
        this.workId = workId;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.searchCriteria = searchCriteria;
    }

    ...

    public boolean isDaemon() {
        return false;
    }

    public void release() {
    }

    /**
     * Runs search work.
     */
    @Override
    @SuppressWarnings("unchecked")
    public void run() {
        SearchService<SC, SRI> searchService = (SearchService<SC, SRI>) ApplicationController.getSearchService();

        List<SRI> results = searchService.search(searchCriteria, startIndex, endIndex);

        if (results.size() > 0) {
            readWriteLock.writeLock().lock();
            workResults = results;
            readWriteLock.writeLock().unlock();
        }
    }
}


Далі клас SearchWorker:

КОД
package com.devua.search.masterworker;

import java.util.concurrent.Executors;


import org.terracotta.jmx.util.events.ClusterEvents;
import org.terracotta.masterworker.Worker;
import org.terracotta.message.pipe.BlockingQueueBasedPipe;
import org.terracotta.message.topology.DefaultTopology;
import org.terracotta.workmanager.dynamic.DynamicWorkerFactory;

import com.devua.search.global.ApplicationController;

/**
* Represents search worker.
*
* @author YDrozhdzhal
*
*/
@SuppressWarnings("unchecked")
public class SearchWorker implements Worker {

    private Worker worker;

    /**
     * Default constructor.
     *
     * @param topologyName
     *            Name of topology.
     * @throws Exception
     */
    public SearchWorker(String topologyName) throws Exception {
        // Registering cluster listener for master
        ClusterEvents.DefaultListener clusterListener = new ClusterListener();
        ClusterEvents.registerListener(clusterListener);

        try {
            clusterListener.waitForRegistration();
        } catch (InterruptedException ex) {
            throw new IllegalStateException("Interrupted by: " + ex);
        }

        DynamicWorkerFactory workerFactory = new DynamicWorkerFactory(topologyName, new DefaultTopology.Factory(new BlockingQueueBasedPipe.Factory(ApplicationController.QUEUE_SIZE)), Executors.newCachedThreadPool());

        worker = workerFactory.create();
    }

    @Override
    public void start() throws Exception {
        worker.start();
    }

    @Override
    public void stop() {
        worker.stop();
    }

    /**
     * Represents listener of cluster events for master.
     *
     * @author YDrozhdzhal
     *
     */
    private class ClusterListener extends ClusterEvents.DefaultListener {

        @Override
        public void nodeDisconnected(Object nodeId) {
            if (ApplicationController.getInstance().isMasterNode(nodeId)) {
                ApplicationController.getInstance().setMasterNodeIdHashCode(null);
            }
        }

    }

}


В цьому класі я реєструю слухача кластерних подій та створюю воркера з tim-masterworker. Зверніть увагу на наступний рядок коду:

КОД
DynamicWorkerFactory workerFactory = new DynamicWorkerFactory(topologyName, new DefaultTopology.Factory(new BlockingQueueBasedPipe.Factory(ApplicationController.QUEUE_SIZE)), Executors.newCachedThreadPool());


а точніше на Executors.newCachedThreadPool() - це означає, що кожний об'єкт типу SearchWork - буде оброблятись в окремому потоці - точніше черга воркера буде опрацьовуватись кількома потоками одночасно.

Тепер розглянемо клас SearchMaster:

КОД
package com.devua.search.masterworker;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.terracotta.jmx.util.events.ClusterEvents;
import org.terracotta.message.pipe.BlockingQueueBasedPipe;
import org.terracotta.message.routing.LoadBalancingRouter;
import org.terracotta.message.topology.DefaultTopology;
import org.terracotta.modules.annotations.HonorTransient;
import org.terracotta.workmanager.dynamic.DynamicWorkManager;

import com.devua.search.global.ApplicationController;
import com.devua.search.service.SearchService;
import commonj.work.Work;
import commonj.work.WorkEvent;
import commonj.work.WorkItem;
import commonj.work.WorkListener;
import commonj.work.WorkManager;

/**
* Represents master for search process.
*
* @author YDrozhdzhal
*
* @param <SC>
*            Type of search criteria.
* @param <SRI>
*            Type of search result item.
*/
@HonorTransient
public class SearchMaster<SC, SRI> implements WorkListener {

    /**
     * Default work timeout.
     */
    private static final int WORK_TIMEOUT = 120000;

    // Error codes
    private static final String ERROR_CODE_PREFIX = "Error ";
    private static final String ERROR_CODE_SUFFIX = ": ";
    private static final String SEARCH_ERROR_CODE_NO_CONNECTED_WORKERS = "SEC_NCV";
    private static final String SEARCH_ERROR_CODE_TIMEOUT_ERROR = "SEC_TE";
    private static final String SEARCH_ERROR_CODE_UKNOWN_ERROR = "SEC_UE";

    private int workTimeout = WORK_TIMEOUT;
    private WorkManager workManager;
    private int amountOfWorkers;
    private SearchService<SC, SRI> searchService;
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Object nodeId;

    /**
     * Default constructor.
     *
     * @param topologyName
     *            Name of topology.
     * @param searchService
     *            Specified search service.
     */
    public SearchMaster(String topologyName, SearchService<SC, SRI> searchService) {
        super();
        DefaultTopology.Factory topologyFactory = new DefaultTopology.Factory(new BlockingQueueBasedPipe.Factory(ApplicationController.QUEUE_SIZE));

        workManager = new DynamicWorkManager(topologyName, topologyFactory, new LoadBalancingRouter());

        amountOfWorkers = 0;

        this.searchService = searchService;

        // Registering cluster listener for master
        ClusterEvents.DefaultListener clusterListener = new ClusterListener();
        ClusterEvents.registerListener(clusterListener);

        try {
            // Waiting for listener registration
            clusterListener.waitForRegistration();

            // Setting master node id
            nodeId = clusterListener.getMyNodeId();
            ApplicationController.getInstance().setMasterNodeIdHashCode(nodeId);
        } catch (InterruptedException ex) {
            throw new IllegalStateException("Interrupted by: " + ex);
        }
    }

    /**
     * Extended constructor.
     *
     * @param topologyName
     *            Name of topology.
     * @param searchService
     *            Specified search service.
     * @param workTimeout
     *            Specified timeout value in milliseconds for work part execution.
     */
    public SearchMaster(String topologyName, SearchService<SC, SRI> searchService, int workTimeout) {
        this(topologyName, searchService);
        this.workTimeout = workTimeout;
    }

    /**
     * Executes search work identified by search criteria.
     *
     * @param searchCriteria
     *            Specified search criteria.
     * @return search results.
     */
    @SuppressWarnings("unchecked")
    public SearchResult<SC, SRI> executeSearchWork(SC searchCriteria) {
        // If there are no connected workers then report error
        if (amountOfWorkers == 0) {
            SearchExecutionException error = new SearchExecutionException(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_NO_CONNECTED_WORKERS + ERROR_CODE_SUFFIX + "Can not schedule work because there are no connected workers.");

            return new SearchResult<SC, SRI>(searchCriteria, null, error);
        }

        int dataSize = searchService.getSearchDataSize();
        int intDataPart = dataSize / amountOfWorkers;
        int leftDataPart = dataSize % amountOfWorkers;

        List<WorkItem> items = new ArrayList<WorkItem>();

        // if intDataPart == 0 then we need to send only one work part
        if (intDataPart != 0) {
            for (int i = 0; i < amountOfWorkers; i++) {
                int startIndex = i * intDataPart;
                int endIndex = (i + 1) * intDataPart - 1;

                WorkItem item = scheduleSearchWork(new SearchWork<SC, SRI>(searchCriteria.toString(), i, startIndex, endIndex, searchCriteria));
                items.add(item);
            }
        }

        if (leftDataPart != 0) {
            WorkItem item = scheduleSearchWork(new SearchWork<SC, SRI>(searchCriteria.toString(), amountOfWorkers + 1, amountOfWorkers * intDataPart, amountOfWorkers * intDataPart + leftDataPart - 1, searchCriteria));
            items.add(item);
        }

        try {
            boolean allCompletedSuccessfully = workManager.waitForAll(items, workTimeout);

            // If all work parts completed successfully then build search result
            if (allCompletedSuccessfully) {
                List<SRI> results = new ArrayList<SRI>();

                for (WorkItem workItem : items) {
                    SearchWork<SC, SRI> searchWork = (SearchWork<SC, SRI>) workItem.getResult();

                    if (searchWork.getWorkResults() != null && searchWork.getWorkResults().size() > 0) {
                        results.addAll(searchWork.getWorkResults());
                    }
                }

                return new SearchResult<SC, SRI>(searchCriteria, results, null);
            }

            // Not all work parts completed and additional handling is required
            List<SRI> results = new ArrayList<SRI>();

            int amountOfTimeoutedWorkParts = 0;
            int amountOfRejectedWorkParts = 0;

            for (WorkItem workItem : items) {
                if (WorkEvent.WORK_COMPLETED == workItem.getStatus()) {
                    SearchWork<SC, SRI> searchWork = (SearchWork<SC, SRI>) workItem.getResult();

                    if (searchWork.getWorkResults() != null && searchWork.getWorkResults().size() > 0) {
                        results.addAll(searchWork.getWorkResults());
                    }

                    continue;
                }

                if (WorkEvent.WORK_REJECTED == workItem.getStatus()) {
                    amountOfRejectedWorkParts++;
                    continue;
                }

                amountOfTimeoutedWorkParts++;
            }

            StringBuilder errorMessageBuilder = new StringBuilder(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_TIMEOUT_ERROR + ERROR_CODE_SUFFIX + "Search completed and some results are returned but some errors occured in search process.");

            errorMessageBuilder.append(" Amount of timeouted work parts = " + amountOfTimeoutedWorkParts);
            errorMessageBuilder.append(".");
            errorMessageBuilder.append(" Amount of rejected work parts = " + amountOfRejectedWorkParts);

            return new SearchResult<SC, SRI>(searchCriteria, results, new SearchExecutionException(errorMessageBuilder.toString()));
        } catch (IllegalArgumentException e) {
            return new SearchResult<SC, SRI>(searchCriteria, null, new SearchExecutionException(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_UKNOWN_ERROR + ERROR_CODE_SUFFIX + "Work execution failed.", e));
        } catch (InterruptedException e) {
            return new SearchResult<SC, SRI>(searchCriteria, null, new SearchExecutionException(ERROR_CODE_PREFIX + SEARCH_ERROR_CODE_UKNOWN_ERROR + ERROR_CODE_SUFFIX + "Work execution failed.", e));
        }
    }

    /**
     * Schedules work.
     *
     * @param work
     *            Work for schedule.
     * @return scheduled <code>WorkItem</code> instance.
     */
    private WorkItem scheduleSearchWork(Work work) {
        return workManager.schedule(work, this);
    }

    /**
     * Handler for WORK_ACCEPTED event.
     */
    public void workAccepted(WorkEvent we) {
        // Do nothing. You can insert here your debug code or work event handler
    }

    /**
     * Handler for WORK_COMPLETED event.
     */
    public void workCompleted(WorkEvent we) {
        // Do nothing. You can insert here your debug code or work event handler
    }

    /**
     * Handler for WORK_REJECTED event.
     */
    public void workRejected(WorkEvent we) {
        // Do nothing. You can insert here your debug code or work event handler
    }

    /**
     * Handler for WORK_STARTED event.
     */
    public void workStarted(WorkEvent we) {
        // Do nothing. You can insert here your debug code or work event handler
    }

    /**
     * Represents listener of cluster events for master.
     *
     * @author YDrozhdzhal
     *
     */
    private class ClusterListener extends ClusterEvents.DefaultListener {

        @Override
        public void initialClusterMembers(Object[] nodeIds) {
            for (Object nodeId : nodeIds) {
                if (!isMasterNode(nodeId.toString())) {
                    readWriteLock.writeLock().lock();

                    // Increasing amount of workers
                    amountOfWorkers++;
                    readWriteLock.writeLock().unlock();
                }
            }
        }

        @Override
        public void nodeConnected(Object nodeId) {
            if (!isMasterNode(nodeId)) {
                readWriteLock.writeLock().lock();

                // Increasing amount of workers
                amountOfWorkers++;
                readWriteLock.writeLock().unlock();
            }
        }

        @Override
        public void nodeDisconnected(Object nodeId) {
            if (!isMasterNode(nodeId)) {
                readWriteLock.writeLock().lock();

                // Decreasing amount of workers
                amountOfWorkers--;
                readWriteLock.writeLock().unlock();
            }
        }

        @Override
        public void thisNodeConnected(Object nodeId) {
            ApplicationController.getInstance().setMasterNodeIdHashCode(nodeId);
        }

        @Override
        public void thisNodeDisconnected(Object nodeId) {
            ApplicationController.getInstance().setMasterNodeIdHashCode(null);
        }

        /**
         * Determines if specified node id is master node.
         *
         * @param nodeId
         *            Specified node id.
         * @return <code>true</code> if specified node id is master node.
         */
        private boolean isMasterNode(Object nodeId) {
            return nodeId.equals(getMyNodeId());
        }

    }

}


Основна логіка розподілу роботи на кілька частин є зашитою в методі public SearchResult<SC, SRI> executeSearchWork(SC searchCriteria).

Тепер розглянемо класи, які використовуються для запуску аплікації. Для того, щоб показати, що в мастер можна кидати кілька робіт одночасно я зробив наступну річ:

користувач вводить число, а аплікація розв'язує поставлену задачу одразу для 5-х чисел, які вибираються на основі введеного користувачем числа. Якщо детальніше - якщо користувач ввів число a, то пошук здійснюється для чисел: a, a + 1, a + 2, a + 3, a + 4.

Отже, клас SearchTask, який відповідає за розв'язок однієї задачі:

КОД
import com.devua.search.masterworker.SearchMaster;
import com.devua.search.masterworker.SearchResult;

/**
* Represents search task.
*
* @author YDrozhdzhal
*
* @param <SC>
*            Type of search criteria.
* @param <SRI>
*            Type of search result item
*/
public class SearchTask<SC, SRI> implements Runnable {

    private SC searchCriteria;
    private SearchMaster<SC, SRI> searchMaster;

    /**
     * Default constructor.
     *
     * @param searchCriteria
     *            Search criteria.
     * @param searchMaster
     *            Search master.
     */
    public SearchTask(SC searchCriteria, SearchMaster<SC, SRI> searchMaster) {
        super();
        this.searchCriteria = searchCriteria;
        this.searchMaster = searchMaster;
    }

    @Override
    public void run() {
        SearchResult<SC, SRI> result = searchMaster.executeSearchWork(searchCriteria);

        if (result.getResults() == null) {
            if (result.hasError()) {
                System.err.println("Error occured when searching: " + result.getError().getMessage());
                return;
            }

            System.err.println("Unknown error occured when searching.");
            return;
        }

        if (result.hasError()) {
            System.out.println("-----------------------------------------------------------------------------------------------"
                + "\nSearch completed with error:" + result.getError().getMessage() + " But some results are available.");
        }

        printResults(result);
    }

    /**
     * Prints search result.
     *
     * @param result
     *            Results for printing.
     */
    private void printResults(SearchResult<SC, SRI> result) {
        System.out.println("-----------------------------------------------------------------------------------------------"
            + "\nAmount of collected results for search criteria " + result.getSearchCriteria() + ": "
            + result.getResults().size() + "\n" + "-----------------------------------------------------------------------------------------------");
    }

}


Останнє, що залишилось, це конфігураційний файл tc-config.xml:

КОД
<?xml version="1.0" encoding="UTF-8"?>
<tc:tc-config xsi:schemaLocation="http://www.terracotta.org/config http://www.terracotta.org/schema/terracotta-4.xsd"
    xmlns:tc="http://www.terracotta.org/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <servers>
        <server name="localhost" host="localhost">
            <dso-port>9510</dso-port>
            <jmx-port>9520</jmx-port>
            <data>target/terracotta/server/data</data>
            <logs>target/terracotta/server/logs</logs>
            <statistics>target/terracotta/server/statistics</statistics>
        </server>
        <update-check>
            <enabled>true</enabled>
        </update-check>
    </servers>
    <!--*********************************************-->
    <!--* Any app-level config changes made must be *-->
    <!--* be mirrored in tc-config-prod.xml as well *-->
    <!--*********************************************-->
    <clients>
        <logs>target/terracotta/clients/logs/%(tc.nodeName)</logs>
        <statistics>target/terracotta/clients/statistics/%(tc.nodeName)</statistics>
        <modules>
            <module name="tim-annotations" version="1.2.3" />
            <module name="tim-masterworker" version="1.2.1" />
        </modules>
    </clients>
    <application>
        <dso>
            <instrumented-classes>
                <include>
                    <class-expression>com.devua.search.global.ApplicationController</class-expression>
                </include>
                <include>
                    <class-expression>com.devua.search.service.impl.SearchServiceImpl</class-expression>
                </include>
            </instrumented-classes>
            <transient-fields>
                <field-name>com.devua.search.masterworker.SearchMaster.workManager</field-name>
            </transient-fields>
        </dso>
    </application>
</tc:tc-config>


Оскільки ми використовуємо анотації та tim-masterworker, то описуємо дані модулі в секції <modules>.
В секцію інструментованих класів додаємо наш ApplicationController та SearchServiceImpl.

Ну і нарешті клас, який використовується для запуску аплікації, StartApp:

КОД
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.devua.search.global.ApplicationController;
import com.devua.search.masterworker.SearchMaster;
import com.devua.search.masterworker.SearchWorker;

/**
* Starts application.
*
* @author YDrozhdzhal
*
*/
public class StartApp {

    private static final int SIZE = 5;
    private SearchMaster<Integer, Integer> searchMaster;

    /**
     * @param searchMaster
     *                the searchMaster to set
     */
    public void setSearchMaster(SearchMaster<Integer, Integer> searchMaster) {
        this.searchMaster = searchMaster;
    }

    public boolean readDataAndSchedule() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        Integer searchCriteria;
        try {
            System.out.println("Enter a search criteria number:");
            String sc = reader.readLine();

            try {
                searchCriteria = new Integer(Integer.parseInt(sc));
            } catch (Exception e) {
                return false;
            }
        } catch (IOException e) {
            return false;
        }

        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i < SIZE; i++) {
            executorService.execute(new SearchTask<Integer, Integer>(searchCriteria + i, searchMaster));
        }

        executorService.shutdown();

        return true;
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        if (ApplicationController.getInstance().getMasterNodeIdHashCode() == 0) {
            StartApp app = new StartApp();

            SearchMaster<Integer, Integer> sm = new SearchMaster<Integer, Integer>(ApplicationController.TOPOLOGY_NAME,
                    ApplicationController.getSearchService());
            app.setSearchMaster(sm);

            while (app.readDataAndSchedule()) {
            }
        } else {
            SearchWorker searchWorker = new SearchWorker(ApplicationController.TOPOLOGY_NAME);
            searchWorker.start();
        }

    }

}


Зверніть увагу, що і мастер і воркер повинні створюватись використовуючи однакові топології.

Увесь приклад прикріплено до цього поста і його можна завантажити.

Для того, щоб його запустити Вам потрібно буде:

- з'єднання з інтернетом.
- коректно встановити Apache Maven 2.0.9, JDK 1.6 (я запускав на Update 12), Terracotta сервер і встановити всі відповідні системні змінні: TC_INSTALL_DIR та JAVA_HOME.
- збілдати аплікацію використовуючи файлик build.bat.
- запустити файлик required-tim-install.bat - який завантажить та проінсталює потрібні модулі в Terracotta сервер.
- запустити Terracotta сервер, використовуючи файлик startServer.bat (консоль номер 1).
- запустити Terracotta клієнт - нашу аплікацію, використовуючи файлик startClient.bat (консоль номер 2).
- почекати коли клієнт запуститься і стане мастером.
- запустити ще кілька Terracotta клієнтів, використовуючи файлик startClient.bat (консолі номер 3, 4 і т. д.)
- в консолі номер 2 вводимо число і чекаємо на результати (кількість знайдених чисел для введеного числа), які будуть показані в консолі номер 2.
- в консолях 3, 4 і т. д. ви можете бачити як розподілялись частинки роботи між воркерами, як вони приймались та оброблялись.
TIGER
Захотілось ще якось на ту теракотну штуку написати тести і запускати їх з мавена. Тести наразі ще не написав, але сексу було немало.

Спочатку надибав цю статтю.

Заватажив собі сурси з Terracotta SVN, які стосуються потрібної теми.

Два дні сексу, і я зміг запустити тести для викачаного проекту.

Якщо вдасться написати тести для наведеного прикладу Master/Worker - то я їх викладу в наступному пості.
TIGER
Вже кілька разів доводилось мати справу з так званими літералами в Terracotta (Terracotta literals), а точніше з класом String, типами boolean та int. Так от, зі своїх експериментів можу зробити наступні висновки, базуючись на використанні Terracotta 2.7.3:


- String не поводиться як літерал, те що документація Terracotta пише про нього - брехня - один раз присвоєне значення вже більше ніколи не можна поміняти;

- boolean не поводиться як літерал, те що документація Terracotta пише про нього - брехня - один раз присвоєне значення вже більше ніколи не можна поміняти;

- int - єдиний літерал, який повів себе так, як пише документація.


Деталі тут.

З цього всього можна зробити такий висновок: Можна довіряти документації від Terracotta, але потрібно все перевіряти на практиці - особливо роботу з літералами.
TIGER
Сьогодні натрапив на бажок в їхній батьківській помці для теракота тестів. Бажок пов'язаний з використанням Maven 2.1.0 як білдера для проекту, а точніше: їхня помка використовує Groovy plugin, який вилітає з NullPointerException при запуску з-під Maven 2.1.0. Деталі читайте тут.
TIGER
Вийшла Terracotta 3.0.0 - деталі читайте тут.
TIGER
Вчора наткнувся на бажок коли використовується tim-master-worker і terracotta-api (я мігрував аплікацію на нову систему кластерних подій, оскільки стара, базована на JMX, вже не підтримується і в недалекому майбутньому буде видалена).

Деталі про бажок можна почитати тут.
TIGER
Новий нюанс при роботі з Теракотою. Деталі можете почитати тут.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Форум IP.Board © 2001-2010 IPS, Inc.