001/* Copyright (C) 2013 TU Dortmund
002 * This file is part of LearnLib, http://www.learnlib.de/.
003 * 
004 * LearnLib is free software; you can redistribute it and/or
005 * modify it under the terms of the GNU Lesser General Public
006 * License version 3.0 as published by the Free Software Foundation.
007 * 
008 * LearnLib is distributed in the hope that it will be useful,
009 * but WITHOUT ANY WARRANTY; without even the implied warranty of
010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
011 * Lesser General Public License for more details.
012 * 
013 * You should have received a copy of the GNU Lesser General Public
014 * License along with LearnLib; if not, see
015 * <http://www.gnu.de/documents/lgpl.en.html>.
016 */
017package de.learnlib.parallelism;
018
019import java.util.Arrays;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.List;
023import java.util.concurrent.CountDownLatch;
024
025import de.learnlib.api.MembershipOracle;
026import de.learnlib.api.Query;
027
028
029/**
030 * A membership oracle that distributes a set of queries among several threads.
031 * 
032 * @author Malte Isberner <malte.isberner@gmail.com>
033 *
034 * @param <I> input symbol class
035 * @param <O> output class
036 */
037public class ParallelOracle<I, O> implements MembershipOracle<I, O> {
038        
039        
040        
041        // TODO Does this number make sense?
042        public static int DEFAULT_MIN_BATCH_SIZE = 10;
043        
044        private final int minBatchSize;
045        private final OracleWorker<I, O>[] workers;
046        private final Thread[] workerThreads;
047        private final MembershipOracle<I,O> thisThreadOracle;
048
049        /**
050         * Constructor for using (potentially) <i>separate</i> oracles for each worker thread,
051         * with a default minimum batch size.
052         * @param oracles the oracles to use for answering the queries. The cardinality of this
053         * list coincides with the number of threads created.
054         */
055        public ParallelOracle(List<? extends MembershipOracle<I,O>> oracles) {
056                this(oracles, DEFAULT_MIN_BATCH_SIZE);
057        }
058        
059        /**
060         * Constructor for using (potentially) <i>separate</i> oracles for each worker thread.
061         * @param oracles the oracles to use for answering the queries. The cardinality of this
062         * list coincides with the number of threads created.
063         * @param minBatchSize the minimum batch size
064         */
065        @SuppressWarnings("unchecked")
066        public ParallelOracle(List<? extends MembershipOracle<I,O>> oracles, int minBatchSize) {
067                int numOracles = oracles.size();
068                if(numOracles <= 0)
069                        throw new IllegalArgumentException("Must provide at least one oracle");
070                workers = new OracleWorker[numOracles - 1];
071                workerThreads = new Thread[numOracles - 1];
072                Iterator<? extends MembershipOracle<I,O>> it = oracles.iterator();
073                thisThreadOracle = it.next();
074                int i = 0;
075                while(it.hasNext()) {
076                        MembershipOracle<I,O> oracle = it.next();
077                        OracleWorker<I,O> worker = new OracleWorker<>(oracle);
078                        workers[i++] = worker;
079                }
080                this.minBatchSize = minBatchSize;
081        }
082        
083        /**
084         * Constructor for using one <i>shared</i> oracle for all worker threads,
085         * with the default minimum batch size.
086         * @param sharedOracle the shared oracle
087         * @param numInstances the number of threads to create
088         */
089        public ParallelOracle(MembershipOracle<I,O> sharedOracle, int numInstances) {
090                this(sharedOracle, numInstances, DEFAULT_MIN_BATCH_SIZE);
091        }
092        
093        /**
094         * Constructor for using one <i>shared</i> oracle for all worker threads,
095         * with a custom minimum batch size.
096         * @param sharedOracle the shared oracle
097         * @param numInstances the number of threads to create
098         * @param minBatchSize the minimum batch size
099         */
100        @SuppressWarnings("unchecked")
101        public ParallelOracle(MembershipOracle<I,O> sharedOracle, int numInstances, int minBatchSize) {
102                if(numInstances <= 0)
103                        throw new IllegalArgumentException("Must have at least one oracle instance");
104                numInstances--;
105                workers = new OracleWorker[numInstances];
106                workerThreads = new Thread[numInstances];
107                thisThreadOracle = sharedOracle;
108                for(int i = 0; i < numInstances; i++) {
109                        OracleWorker<I,O> worker = new OracleWorker<>(sharedOracle);
110                        workers[i] = worker;
111                }
112                this.minBatchSize = minBatchSize;
113        }
114        
115        /**
116         * Starts all worker threads.
117         */
118        public void start() {
119                if(workerThreads.length > 0 && workerThreads[0] != null)
120                        throw new IllegalStateException("ParallelOracle already started");
121                
122                for(int i = 0; i < workers.length; i++) {
123                        Thread t = new Thread(workers[i]);
124                        workerThreads[i] = t;
125                        t.start();
126                }
127        }
128        
129        /*
130         * (non-Javadoc)
131         * @see de.learnlib.api.MembershipOracle#processQueries(java.util.Collection)
132         */
133        @Override
134        @SuppressWarnings("unchecked")
135        public void processQueries(Collection<? extends Query<I, O>> queries) {
136                if(workerThreads.length > 0 && workerThreads[0] == null)
137                        throw new IllegalStateException("ParallelOracle was not started");
138                
139                int num = queries.size();
140                if(num <= 0)
141                        return;
142                
143                int numBatches = (num - minBatchSize)/minBatchSize + 1;
144                if(numBatches > workers.length + 1)
145                        numBatches = workers.length + 1;
146                
147                // Calculate the number of full and non-full batches. The difference in size
148                // will never exceed one (cf. pidgeonhole principle)
149                int fullBatchSize = (num - 1)/numBatches + 1;
150                int nonFullBatches = fullBatchSize*numBatches - num;
151                
152                Iterator<? extends Query<I,O>> queryIt = queries.iterator();
153                
154                // One batch is always executed in the local thread. This saves the thread creation
155                // overhead for the common case where the batch size is quite small.
156                int externalBatches = numBatches - 1;
157                
158                // If we decide not to need any external threads, we can save initializing synchronization
159                // measures.
160                CountDownLatch finishSignal = (externalBatches > 0) ? new CountDownLatch(externalBatches) : null;
161                
162                // Start the threads for the external batches
163                for(int i = 0; i < externalBatches; i++) {
164                        int bs = fullBatchSize;
165                        if(i < nonFullBatches)
166                                bs--;
167                        Query<I,O>[] batch = new Query[bs];
168                        for(int j = 0; j < bs; j++)
169                                batch[j] = queryIt.next();
170                        
171                        workers[i].offerBatch(batch, finishSignal);
172                }
173                
174                // Finally, prepare and process the batch for the oracle executed in this thread.
175                Query<I,O>[] batch = new Query[fullBatchSize];
176                for(int j = 0; j < fullBatchSize; j++)
177                        batch[j] = queryIt.next();
178                
179                thisThreadOracle.processQueries(Arrays.asList(batch));
180                
181                // FIXME: Needs deadlock prevention
182                if(finishSignal != null) {
183                        try {
184                                finishSignal.await();
185                        } catch (InterruptedException e) {
186                                throw new IllegalStateException(e);
187                        }
188                }
189        }
190        
191        /**
192         * Stop all worker threads. After this method has been called, invoking {@link #processQueries(Collection)}
193         * will result in an {@link IllegalStateException} until the next call to {@link #start()}
194         * is made. 
195         */
196        public void stop() {
197                if(workerThreads.length > 0 && workerThreads[0] == null)
198                        throw new IllegalStateException("Parallel oracle was not started");
199                
200                for(int i = 0; i < workers.length; i++) {
201                        workers[i].stop();
202                }
203                for(int i = 0; i < workerThreads.length; i++) {
204                        try {
205                                workerThreads[i].join();
206                        } catch (InterruptedException e) {
207                                // TODO Auto-generated catch block
208                                e.printStackTrace();
209                        }
210                        workerThreads[i] = null;
211                }
212        }
213
214}