View Javadoc

1   /* ============================================================
2    * JRobin : Pure java implementation of RRDTool's functionality
3    * ============================================================
4    *  
5    * Project Info:  http://www.jrobin.org
6    * Project Lead:  Sasa Markovic (saxon@jrobin.org);
7    *
8    * (C) Copyright 2003-2005, by Sasa Markovic.
9    *
10   * Developers:    Sasa Markovic (saxon@jrobin.org)
11   *
12   *
13   * This library is free software; you can redistribute it and/or modify it under the terms
14   * of the GNU Lesser General Public License as published by the Free Software Foundation;
15   * either version 2.1 of the License, or (at your option) any later version.
16   *
17   * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
18   * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19   * See the GNU Lesser General Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser General Public License along with this
22   * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
23   * Boston, MA 02111-1307, USA.
24   */
25  
26  package org.jrobin.core;
27  
28  import java.io.IOException;
29  
30  /**
31   * Class to represent single RRD archive in a RRD with its internal state.
32   * Normally, you don't need methods to manipulate archive objects directly
33   * because JRobin framework does it automatically for you.<p>
34   * <p/>
35   * Each archive object consists of three parts: archive definition, archive state objects
36   * (one state object for each datasource) and round robin archives (one round robin for
37   * each datasource). API (read-only) is provided to access each of theese parts.<p>
38   *
39   * @author <a href="mailto:saxon@jrobin.org">Sasa Markovic</a>
40   */
41  public class Archive implements RrdUpdater, ConsolFuns {
42  	private RrdDb parentDb;
43  	// definition
44  	private RrdString consolFun;
45  	private RrdDouble xff;
46  	private RrdInt steps, rows;
47  	// state
48  	private Robin[] robins;
49  	private ArcState[] states;
50  
51  	Archive(RrdDb parentDb, ArcDef arcDef) throws IOException {
52  		boolean shouldInitialize = arcDef != null;
53  		this.parentDb = parentDb;
54  		consolFun = new RrdString(this, true);  // constant, may be cached
55  		xff = new RrdDouble(this);
56  		steps = new RrdInt(this, true);			// constant, may be cached
57  		rows = new RrdInt(this, true);			// constant, may be cached
58  		if (shouldInitialize) {
59  			consolFun.set(arcDef.getConsolFun());
60  			xff.set(arcDef.getXff());
61  			steps.set(arcDef.getSteps());
62  			rows.set(arcDef.getRows());
63  		}
64  		int n = parentDb.getHeader().getDsCount();
65  		states = new ArcState[n];
66  		robins = new Robin[n];
67  		for (int i = 0; i < n; i++) {
68  			states[i] = new ArcState(this, shouldInitialize);
69  			int numRows = rows.get();
70  			robins[i] = new Robin(this, numRows, shouldInitialize);
71  		}
72  	}
73  
74  	// read from XML
75  	Archive(RrdDb parentDb, DataImporter reader, int arcIndex) throws IOException, RrdException {
76  		this(parentDb, new ArcDef(
77  				reader.getConsolFun(arcIndex), reader.getXff(arcIndex),
78  				reader.getSteps(arcIndex), reader.getRows(arcIndex)));
79  		int n = parentDb.getHeader().getDsCount();
80  		for (int i = 0; i < n; i++) {
81  			// restore state
82  			states[i].setAccumValue(reader.getStateAccumValue(arcIndex, i));
83  			states[i].setNanSteps(reader.getStateNanSteps(arcIndex, i));
84  			// restore robins
85  			double[] values = reader.getValues(arcIndex, i);
86  			robins[i].update(values);
87  		}
88  	}
89  
90  	/**
91  	 * Returns archive time step in seconds. Archive step is equal to RRD step
92  	 * multiplied with the number of archive steps.
93  	 *
94  	 * @return Archive time step in seconds
95  	 * @throws IOException Thrown in case of I/O error.
96  	 */
97  	public long getArcStep() throws IOException {
98  		long step = parentDb.getHeader().getStep();
99  		return step * steps.get();
100 	}
101 
102 	String dump() throws IOException {
103 		StringBuffer buffer = new StringBuffer("== ARCHIVE ==\n");
104 		buffer.append("RRA:").append(consolFun.get()).append(":").append(xff.get()).append(":").append(steps.get()).
105 				append(":").append(rows.get()).append("\n");
106 		buffer.append("interval [").append(getStartTime()).append(", ").append(getEndTime()).append("]" + "\n");
107 		for (int i = 0; i < robins.length; i++) {
108 			buffer.append(states[i].dump());
109 			buffer.append(robins[i].dump());
110 		}
111 		return buffer.toString();
112 	}
113 
114 	RrdDb getParentDb() {
115 		return parentDb;
116 	}
117 
118 	void archive(int dsIndex, double value, long numUpdates) throws IOException {
119 		Robin robin = robins[dsIndex];
120 		ArcState state = states[dsIndex];
121 		long step = parentDb.getHeader().getStep();
122 		long lastUpdateTime = parentDb.getHeader().getLastUpdateTime();
123 		long updateTime = Util.normalize(lastUpdateTime, step) + step;
124 		long arcStep = getArcStep();
125 		// finish current step
126 		while (numUpdates > 0) {
127 			accumulate(state, value);
128 			numUpdates--;
129 			if (updateTime % arcStep == 0) {
130 				finalizeStep(state, robin);
131 				break;
132 			}
133 			else {
134 				updateTime += step;
135 			}
136 		}
137 		// update robin in bulk
138 		int bulkUpdateCount = (int) Math.min(numUpdates / steps.get(), (long) rows.get());
139 		robin.bulkStore(value, bulkUpdateCount);
140 		// update remaining steps
141 		long remainingUpdates = numUpdates % steps.get();
142 		for (long i = 0; i < remainingUpdates; i++) {
143 			accumulate(state, value);
144 		}
145 	}
146 
147 	private void accumulate(ArcState state, double value) throws IOException {
148 		if (Double.isNaN(value)) {
149 			state.setNanSteps(state.getNanSteps() + 1);
150 		}
151 		else {
152 			if (consolFun.get().equals(CF_MIN)) {
153 				state.setAccumValue(Util.min(state.getAccumValue(), value));
154 			}
155 			else if (consolFun.get().equals(CF_MAX)) {
156 				state.setAccumValue(Util.max(state.getAccumValue(), value));
157 			}
158 			else if (consolFun.get().equals(CF_LAST)) {
159 				state.setAccumValue(value);
160 			}
161 			else if (consolFun.get().equals(CF_AVERAGE)) {
162 				state.setAccumValue(Util.sum(state.getAccumValue(), value));
163 			}
164 		}
165 	}
166 
167 	private void finalizeStep(ArcState state, Robin robin) throws IOException {
168 		// should store
169 		long arcSteps = steps.get();
170 		double arcXff = xff.get();
171 		long nanSteps = state.getNanSteps();
172 		//double nanPct = (double) nanSteps / (double) arcSteps;
173 		double accumValue = state.getAccumValue();
174 		if (nanSteps <= arcXff * arcSteps && !Double.isNaN(accumValue)) {
175 			if (consolFun.get().equals(CF_AVERAGE)) {
176 				accumValue /= (arcSteps - nanSteps);
177 			}
178 			robin.store(accumValue);
179 		}
180 		else {
181 			robin.store(Double.NaN);
182 		}
183 		state.setAccumValue(Double.NaN);
184 		state.setNanSteps(0);
185 	}
186 
187 	/**
188 	 * Returns archive consolidation function ("AVERAGE", "MIN", "MAX" or "LAST").
189 	 *
190 	 * @return Archive consolidation function.
191 	 * @throws IOException Thrown in case of I/O error.
192 	 */
193 	public String getConsolFun() throws IOException {
194 		return consolFun.get();
195 	}
196 
197 	/**
198 	 * Returns archive X-files factor.
199 	 *
200 	 * @return Archive X-files factor (between 0 and 1).
201 	 * @throws IOException Thrown in case of I/O error.
202 	 */
203 	public double getXff() throws IOException {
204 		return xff.get();
205 	}
206 
207 	/**
208 	 * Returns the number of archive steps.
209 	 *
210 	 * @return Number of archive steps.
211 	 * @throws IOException Thrown in case of I/O error.
212 	 */
213 	public int getSteps() throws IOException {
214 		return steps.get();
215 	}
216 
217 	/**
218 	 * Returns the number of archive rows.
219 	 *
220 	 * @return Number of archive rows.
221 	 * @throws IOException Thrown in case of I/O error.
222 	 */
223 	public int getRows() throws IOException {
224 		return rows.get();
225 	}
226 
227 	/**
228 	 * Returns current starting timestamp. This value is not constant.
229 	 *
230 	 * @return Timestamp corresponding to the first archive row
231 	 * @throws IOException Thrown in case of I/O error.
232 	 */
233 	public long getStartTime() throws IOException {
234 		long endTime = getEndTime();
235 		long arcStep = getArcStep();
236 		long numRows = rows.get();
237 		return endTime - (numRows - 1) * arcStep;
238 	}
239 
240 	/**
241 	 * Returns current ending timestamp. This value is not constant.
242 	 *
243 	 * @return Timestamp corresponding to the last archive row
244 	 * @throws IOException Thrown in case of I/O error.
245 	 */
246 	public long getEndTime() throws IOException {
247 		long arcStep = getArcStep();
248 		long lastUpdateTime = parentDb.getHeader().getLastUpdateTime();
249 		return Util.normalize(lastUpdateTime, arcStep);
250 	}
251 
252 	/**
253 	 * Returns the underlying archive state object. Each datasource has its
254 	 * corresponding ArcState object (archive states are managed independently
255 	 * for each RRD datasource).
256 	 *
257 	 * @param dsIndex Datasource index
258 	 * @return Underlying archive state object
259 	 */
260 	public ArcState getArcState(int dsIndex) {
261 		return states[dsIndex];
262 	}
263 
264 	/**
265 	 * Returns the underlying round robin archive. Robins are used to store actual
266 	 * archive values on a per-datasource basis.
267 	 *
268 	 * @param dsIndex Index of the datasource in the RRD.
269 	 * @return Underlying round robin archive for the given datasource.
270 	 */
271 	public Robin getRobin(int dsIndex) {
272 		return robins[dsIndex];
273 	}
274 
275 	FetchData fetchData(FetchRequest request) throws IOException, RrdException {
276 		long arcStep = getArcStep();
277 		long fetchStart = Util.normalize(request.getFetchStart(), arcStep);
278 		long fetchEnd = Util.normalize(request.getFetchEnd(), arcStep);
279 		if (fetchEnd < request.getFetchEnd()) {
280 			fetchEnd += arcStep;
281 		}
282 		long startTime = getStartTime();
283 		long endTime = getEndTime();
284 		String[] dsToFetch = request.getFilter();
285 		if (dsToFetch == null) {
286 			dsToFetch = parentDb.getDsNames();
287 		}
288 		int dsCount = dsToFetch.length;
289 		int ptsCount = (int) ((fetchEnd - fetchStart) / arcStep + 1);
290 		long[] timestamps = new long[ptsCount];
291 		double[][] values = new double[dsCount][ptsCount];
292 		long matchStartTime = Math.max(fetchStart, startTime);
293 		long matchEndTime = Math.min(fetchEnd, endTime);
294 		double[][] robinValues = null;
295 		if (matchStartTime <= matchEndTime) {
296 			// preload robin values
297 			int matchCount = (int) ((matchEndTime - matchStartTime) / arcStep + 1);
298 			int matchStartIndex = (int) ((matchStartTime - startTime) / arcStep);
299 			robinValues = new double[dsCount][];
300 			for (int i = 0; i < dsCount; i++) {
301 				int dsIndex = parentDb.getDsIndex(dsToFetch[i]);
302 				robinValues[i] = robins[dsIndex].getValues(matchStartIndex, matchCount);
303 			}
304 		}
305 		for (int ptIndex = 0; ptIndex < ptsCount; ptIndex++) {
306 			long time = fetchStart + ptIndex * arcStep;
307 			timestamps[ptIndex] = time;
308 			for (int i = 0; i < dsCount; i++) {
309 				double value = Double.NaN;
310 				if (time >= matchStartTime && time <= matchEndTime) {
311 					// inbound time
312 					int robinValueIndex = (int) ((time - matchStartTime) / arcStep);
313 					assert robinValues != null;
314 					value = robinValues[i][robinValueIndex];
315 				}
316 				values[i][ptIndex] = value;
317 			}
318 		}
319 		FetchData fetchData = new FetchData(this, request);
320 		fetchData.setTimestamps(timestamps);
321 		fetchData.setValues(values);
322 		return fetchData;
323 	}
324 
325 	void appendXml(XmlWriter writer) throws IOException {
326 		writer.startTag("rra");
327 		writer.writeTag("cf", consolFun.get());
328 		writer.writeComment(getArcStep() + " seconds");
329 		writer.writeTag("pdp_per_row", steps.get());
330 		writer.writeTag("xff", xff.get());
331 		writer.startTag("cdp_prep");
332 		for (ArcState state : states) {
333 			state.appendXml(writer);
334 		}
335 		writer.closeTag(); // cdp_prep
336 		writer.startTag("database");
337 		long startTime = getStartTime();
338 		for (int i = 0; i < rows.get(); i++) {
339 			long time = startTime + i * getArcStep();
340 			writer.writeComment(Util.getDate(time) + " / " + time);
341 			writer.startTag("row");
342 			for (Robin robin : robins) {
343 				writer.writeTag("v", robin.getValue(i));
344 			}
345 			writer.closeTag(); // row
346 		}
347 		writer.closeTag(); // database
348 		writer.closeTag(); // rra
349 	}
350 
351 	/**
352 	 * Copies object's internal state to another Archive object.
353 	 *
354 	 * @param other New Archive object to copy state to
355 	 * @throws IOException  Thrown in case of I/O error
356 	 * @throws RrdException Thrown if supplied argument is not an Archive object
357 	 */
358 	public void copyStateTo(RrdUpdater other) throws IOException, RrdException {
359 		if (!(other instanceof Archive)) {
360 			throw new RrdException(
361 					"Cannot copy Archive object to " + other.getClass().getName());
362 		}
363 		Archive arc = (Archive) other;
364 		if (!arc.consolFun.get().equals(consolFun.get())) {
365 			throw new RrdException("Incompatible consolidation functions");
366 		}
367 		if (arc.steps.get() != steps.get()) {
368 			throw new RrdException("Incompatible number of steps");
369 		}
370 		int count = parentDb.getHeader().getDsCount();
371 		for (int i = 0; i < count; i++) {
372 			int j = Util.getMatchingDatasourceIndex(parentDb, i, arc.parentDb);
373 			if (j >= 0) {
374 				states[i].copyStateTo(arc.states[j]);
375 				robins[i].copyStateTo(arc.robins[j]);
376 			}
377 		}
378 	}
379 
380 	/**
381 	 * Sets X-files factor to a new value.
382 	 *
383 	 * @param xff New X-files factor value. Must be >= 0 and < 1.
384 	 * @throws RrdException Thrown if invalid value is supplied
385 	 * @throws IOException  Thrown in case of I/O error
386 	 */
387 	public void setXff(double xff) throws RrdException, IOException {
388 		if (xff < 0D || xff >= 1D) {
389 			throw new RrdException("Invalid xff supplied (" + xff + "), must be >= 0 and < 1");
390 		}
391 		this.xff.set(xff);
392 	}
393 
394 	/**
395 	 * Returns the underlying storage (backend) object which actually performs all
396 	 * I/O operations.
397 	 *
398 	 * @return I/O backend object
399 	 */
400 	public RrdBackend getRrdBackend() {
401 		return parentDb.getRrdBackend();
402 	}
403 
404 	/**
405 	 * Required to implement RrdUpdater interface. You should never call this method directly.
406 	 *
407 	 * @return Allocator object
408 	 */
409 	public RrdAllocator getRrdAllocator() {
410 		return parentDb.getRrdAllocator();
411 	}
412 }