001/* Copyright (C) 2013 TU Dortmund 002 * This file is part of LearnLib, http://www.learnlib.de/. 003 * 004 * LearnLib is free software; you can redistribute it and/or 005 * modify it under the terms of the GNU Lesser General Public 006 * License version 3.0 as published by the Free Software Foundation. 007 * 008 * LearnLib is distributed in the hope that it will be useful, 009 * but WITHOUT ANY WARRANTY; without even the implied warranty of 010 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 011 * Lesser General Public License for more details. 012 * 013 * You should have received a copy of the GNU Lesser General Public 014 * License along with LearnLib; if not, see 015 * <http://www.gnu.de/documents/lgpl.en.html>. 016 */ 017package de.learnlib.parallelism; 018 019import java.util.Arrays; 020import java.util.Collection; 021import java.util.Iterator; 022import java.util.List; 023import java.util.concurrent.CountDownLatch; 024 025import de.learnlib.api.MembershipOracle; 026import de.learnlib.api.Query; 027 028 029/** 030 * A membership oracle that distributes a set of queries among several threads. 031 * 032 * @author Malte Isberner <malte.isberner@gmail.com> 033 * 034 * @param <I> input symbol class 035 * @param <O> output class 036 */ 037public class ParallelOracle<I, O> implements MembershipOracle<I, O> { 038 039 040 041 // TODO Does this number make sense? 042 public static int DEFAULT_MIN_BATCH_SIZE = 10; 043 044 private final int minBatchSize; 045 private final OracleWorker<I, O>[] workers; 046 private final Thread[] workerThreads; 047 private final MembershipOracle<I,O> thisThreadOracle; 048 049 /** 050 * Constructor for using (potentially) <i>separate</i> oracles for each worker thread, 051 * with a default minimum batch size. 052 * @param oracles the oracles to use for answering the queries. The cardinality of this 053 * list coincides with the number of threads created. 054 */ 055 public ParallelOracle(List<? extends MembershipOracle<I,O>> oracles) { 056 this(oracles, DEFAULT_MIN_BATCH_SIZE); 057 } 058 059 /** 060 * Constructor for using (potentially) <i>separate</i> oracles for each worker thread. 061 * @param oracles the oracles to use for answering the queries. The cardinality of this 062 * list coincides with the number of threads created. 063 * @param minBatchSize the minimum batch size 064 */ 065 @SuppressWarnings("unchecked") 066 public ParallelOracle(List<? extends MembershipOracle<I,O>> oracles, int minBatchSize) { 067 int numOracles = oracles.size(); 068 if(numOracles <= 0) 069 throw new IllegalArgumentException("Must provide at least one oracle"); 070 workers = new OracleWorker[numOracles - 1]; 071 workerThreads = new Thread[numOracles - 1]; 072 Iterator<? extends MembershipOracle<I,O>> it = oracles.iterator(); 073 thisThreadOracle = it.next(); 074 int i = 0; 075 while(it.hasNext()) { 076 MembershipOracle<I,O> oracle = it.next(); 077 OracleWorker<I,O> worker = new OracleWorker<>(oracle); 078 workers[i++] = worker; 079 } 080 this.minBatchSize = minBatchSize; 081 } 082 083 /** 084 * Constructor for using one <i>shared</i> oracle for all worker threads, 085 * with the default minimum batch size. 086 * @param sharedOracle the shared oracle 087 * @param numInstances the number of threads to create 088 */ 089 public ParallelOracle(MembershipOracle<I,O> sharedOracle, int numInstances) { 090 this(sharedOracle, numInstances, DEFAULT_MIN_BATCH_SIZE); 091 } 092 093 /** 094 * Constructor for using one <i>shared</i> oracle for all worker threads, 095 * with a custom minimum batch size. 096 * @param sharedOracle the shared oracle 097 * @param numInstances the number of threads to create 098 * @param minBatchSize the minimum batch size 099 */ 100 @SuppressWarnings("unchecked") 101 public ParallelOracle(MembershipOracle<I,O> sharedOracle, int numInstances, int minBatchSize) { 102 if(numInstances <= 0) 103 throw new IllegalArgumentException("Must have at least one oracle instance"); 104 numInstances--; 105 workers = new OracleWorker[numInstances]; 106 workerThreads = new Thread[numInstances]; 107 thisThreadOracle = sharedOracle; 108 for(int i = 0; i < numInstances; i++) { 109 OracleWorker<I,O> worker = new OracleWorker<>(sharedOracle); 110 workers[i] = worker; 111 } 112 this.minBatchSize = minBatchSize; 113 } 114 115 /** 116 * Starts all worker threads. 117 */ 118 public void start() { 119 if(workerThreads.length > 0 && workerThreads[0] != null) 120 throw new IllegalStateException("ParallelOracle already started"); 121 122 for(int i = 0; i < workers.length; i++) { 123 Thread t = new Thread(workers[i]); 124 workerThreads[i] = t; 125 t.start(); 126 } 127 } 128 129 /* 130 * (non-Javadoc) 131 * @see de.learnlib.api.MembershipOracle#processQueries(java.util.Collection) 132 */ 133 @Override 134 @SuppressWarnings("unchecked") 135 public void processQueries(Collection<? extends Query<I, O>> queries) { 136 if(workerThreads.length > 0 && workerThreads[0] == null) 137 throw new IllegalStateException("ParallelOracle was not started"); 138 139 int num = queries.size(); 140 if(num <= 0) 141 return; 142 143 int numBatches = (num - minBatchSize)/minBatchSize + 1; 144 if(numBatches > workers.length + 1) 145 numBatches = workers.length + 1; 146 147 // Calculate the number of full and non-full batches. The difference in size 148 // will never exceed one (cf. pidgeonhole principle) 149 int fullBatchSize = (num - 1)/numBatches + 1; 150 int nonFullBatches = fullBatchSize*numBatches - num; 151 152 Iterator<? extends Query<I,O>> queryIt = queries.iterator(); 153 154 // One batch is always executed in the local thread. This saves the thread creation 155 // overhead for the common case where the batch size is quite small. 156 int externalBatches = numBatches - 1; 157 158 // If we decide not to need any external threads, we can save initializing synchronization 159 // measures. 160 CountDownLatch finishSignal = (externalBatches > 0) ? new CountDownLatch(externalBatches) : null; 161 162 // Start the threads for the external batches 163 for(int i = 0; i < externalBatches; i++) { 164 int bs = fullBatchSize; 165 if(i < nonFullBatches) 166 bs--; 167 Query<I,O>[] batch = new Query[bs]; 168 for(int j = 0; j < bs; j++) 169 batch[j] = queryIt.next(); 170 171 workers[i].offerBatch(batch, finishSignal); 172 } 173 174 // Finally, prepare and process the batch for the oracle executed in this thread. 175 Query<I,O>[] batch = new Query[fullBatchSize]; 176 for(int j = 0; j < fullBatchSize; j++) 177 batch[j] = queryIt.next(); 178 179 thisThreadOracle.processQueries(Arrays.asList(batch)); 180 181 // FIXME: Needs deadlock prevention 182 if(finishSignal != null) { 183 try { 184 finishSignal.await(); 185 } catch (InterruptedException e) { 186 throw new IllegalStateException(e); 187 } 188 } 189 } 190 191 /** 192 * Stop all worker threads. After this method has been called, invoking {@link #processQueries(Collection)} 193 * will result in an {@link IllegalStateException} until the next call to {@link #start()} 194 * is made. 195 */ 196 public void stop() { 197 if(workerThreads.length > 0 && workerThreads[0] == null) 198 throw new IllegalStateException("Parallel oracle was not started"); 199 200 for(int i = 0; i < workers.length; i++) { 201 workers[i].stop(); 202 } 203 for(int i = 0; i < workerThreads.length; i++) { 204 try { 205 workerThreads[i].join(); 206 } catch (InterruptedException e) { 207 // TODO Auto-generated catch block 208 e.printStackTrace(); 209 } 210 workerThreads[i] = null; 211 } 212 } 213 214}