View Javadoc

1   /* ============================================================
2    * JRobin : Pure java implementation of RRDTool's functionality
3    * ============================================================
4    *
5    * Project Info:  http://www.jrobin.org
6    * Project Lead:  Sasa Markovic (saxon@jrobin.org);
7    *
8    * (C) Copyright 2003-2005, by Sasa Markovic.
9    *
10   * Developers:    Sasa Markovic (saxon@jrobin.org)
11   *
12   *
13   * This library is free software; you can redistribute it and/or modify it under the terms
14   * of the GNU Lesser General Public License as published by the Free Software Foundation;
15   * either version 2.1 of the License, or (at your option) any later version.
16   *
17   * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
18   * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19   * See the GNU Lesser General Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser General Public License along with this
22   * library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330,
23   * Boston, MA 02111-1307, USA.
24   */
25  
26  package org.jrobin.core;
27  
28  import java.io.IOException;
29  
30  /**
31   * Class to represent single datasource within RRD. Each datasource object holds the
32   * following information: datasource definition (once set, never changed) and
33   * datasource state variables (changed whenever RRD gets updated).<p>
34   * <p/>
35   * Normally, you don't need to manipluate Datasource objects directly, it's up to
36   * JRobin framework to do it for you.
37   *
38   * @author <a href="mailto:saxon@jrobin.org">Sasa Markovic</a>
39   */
40  
41  public class Datasource implements RrdUpdater, DsTypes {
42  	private static final double MAX_32_BIT = Math.pow(2, 32);
43  	private static final double MAX_64_BIT = Math.pow(2, 64);
44  
45  	private RrdDb parentDb;
46  	// definition
47  	private RrdString dsName, dsType;
48  	private RrdLong heartbeat;
49  	private RrdDouble minValue, maxValue;
50  
51  	// state variables
52  	private RrdDouble lastValue;
53  	private RrdLong nanSeconds;
54  	private RrdDouble accumValue;
55  
56  	Datasource(RrdDb parentDb, DsDef dsDef) throws IOException {
57  		boolean shouldInitialize = dsDef != null;
58  		this.parentDb = parentDb;
59  		dsName = new RrdString(this);
60  		dsType = new RrdString(this);
61  		heartbeat = new RrdLong(this);
62  		minValue = new RrdDouble(this);
63  		maxValue = new RrdDouble(this);
64  		lastValue = new RrdDouble(this);
65  		accumValue = new RrdDouble(this);
66  		nanSeconds = new RrdLong(this);
67  		if (shouldInitialize) {
68  			dsName.set(dsDef.getDsName());
69  			dsType.set(dsDef.getDsType());
70  			heartbeat.set(dsDef.getHeartbeat());
71  			minValue.set(dsDef.getMinValue());
72  			maxValue.set(dsDef.getMaxValue());
73  			lastValue.set(Double.NaN);
74  			accumValue.set(0.0);
75  			Header header = parentDb.getHeader();
76  			nanSeconds.set(header.getLastUpdateTime() % header.getStep());
77  		}
78  	}
79  
80  	Datasource(RrdDb parentDb, DataImporter reader, int dsIndex) throws IOException, RrdException {
81  		this(parentDb, null);
82  		dsName.set(reader.getDsName(dsIndex));
83  		dsType.set(reader.getDsType(dsIndex));
84  		heartbeat.set(reader.getHeartbeat(dsIndex));
85  		minValue.set(reader.getMinValue(dsIndex));
86  		maxValue.set(reader.getMaxValue(dsIndex));
87  		lastValue.set(reader.getLastValue(dsIndex));
88  		accumValue.set(reader.getAccumValue(dsIndex));
89  		nanSeconds.set(reader.getNanSeconds(dsIndex));
90  	}
91  
92  	String dump() throws IOException {
93  		return "== DATASOURCE ==\n" +
94  				"DS:" + dsName.get() + ":" + dsType.get() + ":" +
95  				heartbeat.get() + ":" + minValue.get() + ":" +
96  				maxValue.get() + "\nlastValue:" + lastValue.get() +
97  				" nanSeconds:" + nanSeconds.get() +
98  				" accumValue:" + accumValue.get() + "\n";
99  	}
100 
101 	/**
102 	 * Returns datasource name.
103 	 *
104 	 * @return Datasource name
105 	 * @throws IOException Thrown in case of I/O error
106 	 */
107 	public String getDsName() throws IOException {
108 		return dsName.get();
109 	}
110 
111 	/**
112 	 * Returns datasource type (GAUGE, COUNTER, DERIVE, ABSOLUTE).
113 	 *
114 	 * @return Datasource type.
115 	 * @throws IOException Thrown in case of I/O error
116 	 */
117 	public String getDsType() throws IOException {
118 		return dsType.get();
119 	}
120 
121 	/**
122 	 * Returns datasource heartbeat
123 	 *
124 	 * @return Datasource heartbeat
125 	 * @throws IOException Thrown in case of I/O error
126 	 */
127 
128 	public long getHeartbeat() throws IOException {
129 		return heartbeat.get();
130 	}
131 
132 	/**
133 	 * Returns mimimal allowed value for this datasource.
134 	 *
135 	 * @return Minimal value allowed.
136 	 * @throws IOException Thrown in case of I/O error
137 	 */
138 	public double getMinValue() throws IOException {
139 		return minValue.get();
140 	}
141 
142 	/**
143 	 * Returns maximal allowed value for this datasource.
144 	 *
145 	 * @return Maximal value allowed.
146 	 * @throws IOException Thrown in case of I/O error
147 	 */
148 	public double getMaxValue() throws IOException {
149 		return maxValue.get();
150 	}
151 
152 	/**
153 	 * Returns last known value of the datasource.
154 	 *
155 	 * @return Last datasource value.
156 	 * @throws IOException Thrown in case of I/O error
157 	 */
158 	public double getLastValue() throws IOException {
159 		return lastValue.get();
160 	}
161 
162 	/**
163 	 * Returns value this datasource accumulated so far.
164 	 *
165 	 * @return Accumulated datasource value.
166 	 * @throws IOException Thrown in case of I/O error
167 	 */
168 	public double getAccumValue() throws IOException {
169 		return accumValue.get();
170 	}
171 
172 	/**
173 	 * Returns the number of accumulated NaN seconds.
174 	 *
175 	 * @return Accumulated NaN seconds.
176 	 * @throws IOException Thrown in case of I/O error
177 	 */
178 	public long getNanSeconds() throws IOException {
179 		return nanSeconds.get();
180 	}
181 
182 	void process(long newTime, double newValue) throws IOException, RrdException {
183 		Header header = parentDb.getHeader();
184 		long step = header.getStep();
185 		long oldTime = header.getLastUpdateTime();
186 		long startTime = Util.normalize(oldTime, step);
187 		long endTime = startTime + step;
188 		double oldValue = lastValue.get();
189 		double updateValue = calculateUpdateValue(oldTime, oldValue, newTime, newValue);
190 		if (newTime < endTime) {
191 			accumulate(oldTime, newTime, updateValue);
192 		}
193 		else {
194 			// should store something
195 			long boundaryTime = Util.normalize(newTime, step);
196 			accumulate(oldTime, boundaryTime, updateValue);
197 			double value = calculateTotal(startTime, boundaryTime);
198 			// how many updates?
199 			long numSteps = (boundaryTime - endTime) / step + 1L;
200 			// ACTION!
201 			parentDb.archive(this, value, numSteps);
202 			// cleanup
203 			nanSeconds.set(0);
204 			accumValue.set(0.0);
205 			accumulate(boundaryTime, newTime, updateValue);
206 		}
207 	}
208 
209 	private double calculateUpdateValue(long oldTime, double oldValue,
210 										long newTime, double newValue) throws IOException {
211 		double updateValue = Double.NaN;
212 		if (newTime - oldTime <= heartbeat.get()) {
213 			String type = dsType.get();
214 			if (type.equals(DT_GAUGE)) {
215 				updateValue = newValue;
216 			}
217 			else if (type.equals(DT_ABSOLUTE)) {
218 				if (!Double.isNaN(newValue)) {
219 					updateValue = newValue / (newTime - oldTime);
220 				}
221 			}
222 			else if (type.equals(DT_DERIVE)) {
223 				if (!Double.isNaN(newValue) && !Double.isNaN(oldValue)) {
224 					updateValue = (newValue - oldValue) / (newTime - oldTime);
225 				}
226 			}
227 			else if (type.equals(DT_COUNTER)) {
228 				if (!Double.isNaN(newValue) && !Double.isNaN(oldValue)) {
229 					double diff = newValue - oldValue;
230 					if (diff < 0) {
231 						diff += MAX_32_BIT;
232 					}
233 					if (diff < 0) {
234 						diff += MAX_64_BIT - MAX_32_BIT;
235 					}
236 					if (diff >= 0) {
237 						updateValue = diff / (newTime - oldTime);
238 					}
239 				}
240 			}
241 			if (!Double.isNaN(updateValue)) {
242 				double minVal = minValue.get();
243 				double maxVal = maxValue.get();
244 				if (!Double.isNaN(minVal) && updateValue < minVal) {
245 					updateValue = Double.NaN;
246 				}
247 				if (!Double.isNaN(maxVal) && updateValue > maxVal) {
248 					updateValue = Double.NaN;
249 				}
250 			}
251 		}
252 		lastValue.set(newValue);
253 		return updateValue;
254 	}
255 
256 	private void accumulate(long oldTime, long newTime, double updateValue) throws IOException {
257 		if (Double.isNaN(updateValue)) {
258 			nanSeconds.set(nanSeconds.get() + (newTime - oldTime));
259 		}
260 		else {
261 			accumValue.set(accumValue.get() + updateValue * (newTime - oldTime));
262 		}
263 	}
264 
265 	private double calculateTotal(long startTime, long boundaryTime) throws IOException {
266 		double totalValue = Double.NaN;
267 		long validSeconds = boundaryTime - startTime - nanSeconds.get();
268 		if (nanSeconds.get() <= heartbeat.get() && validSeconds > 0) {
269 			totalValue = accumValue.get() / validSeconds;
270 		}
271 		// IMPORTANT:
272 		// if datasource name ends with "!", we'll send zeros instead of NaNs
273 		// this might be handy from time to time
274 		if (Double.isNaN(totalValue) && dsName.get().endsWith(DsDef.FORCE_ZEROS_FOR_NANS_SUFFIX)) {
275 			totalValue = 0D;
276 		}
277 		return totalValue;
278 	}
279 
280 	void appendXml(XmlWriter writer) throws IOException {
281 		writer.startTag("ds");
282 		writer.writeTag("name", dsName.get());
283 		writer.writeTag("type", dsType.get());
284 		writer.writeTag("minimal_heartbeat", heartbeat.get());
285 		writer.writeTag("min", minValue.get());
286 		writer.writeTag("max", maxValue.get());
287 		writer.writeComment("PDP Status");
288 		writer.writeTag("last_ds", lastValue.get(), "UNKN");
289 		writer.writeTag("value", accumValue.get());
290 		writer.writeTag("unknown_sec", nanSeconds.get());
291 		writer.closeTag();  // ds
292 	}
293 
294 	/**
295 	 * Copies object's internal state to another Datasource object.
296 	 *
297 	 * @param other New Datasource object to copy state to
298 	 * @throws IOException  Thrown in case of I/O error
299 	 * @throws RrdException Thrown if supplied argument is not a Datasource object
300 	 */
301 	public void copyStateTo(RrdUpdater other) throws IOException, RrdException {
302 		if (!(other instanceof Datasource)) {
303 			throw new RrdException(
304 					"Cannot copy Datasource object to " + other.getClass().getName());
305 		}
306 		Datasource datasource = (Datasource) other;
307 		if (!datasource.dsName.get().equals(dsName.get())) {
308 			throw new RrdException("Incomaptible datasource names");
309 		}
310 		if (!datasource.dsType.get().equals(dsType.get())) {
311 			throw new RrdException("Incomaptible datasource types");
312 		}
313 		datasource.lastValue.set(lastValue.get());
314 		datasource.nanSeconds.set(nanSeconds.get());
315 		datasource.accumValue.set(accumValue.get());
316 	}
317 
318 	/**
319 	 * Returns index of this Datasource object in the RRD.
320 	 *
321 	 * @return Datasource index in the RRD.
322 	 * @throws IOException Thrown in case of I/O error
323 	 */
324 	public int getDsIndex() throws IOException {
325 		try {
326 			return parentDb.getDsIndex(dsName.get());
327 		}
328 		catch (RrdException e) {
329 			return -1;
330 		}
331 	}
332 
333 	/**
334 	 * Sets datasource heartbeat to a new value.
335 	 *
336 	 * @param heartbeat New heartbeat value
337 	 * @throws IOException  Thrown in case of I/O error
338 	 * @throws RrdException Thrown if invalid (non-positive) heartbeat value is specified.
339 	 */
340 	public void setHeartbeat(long heartbeat) throws RrdException, IOException {
341 		if (heartbeat < 1L) {
342 			throw new RrdException("Invalid heartbeat specified: " + heartbeat);
343 		}
344 		this.heartbeat.set(heartbeat);
345 	}
346 
347 	/**
348 	 * Sets datasource name to a new value
349 	 *
350 	 * @param newDsName New datasource name
351 	 * @throws RrdException Thrown if invalid data source name is specified (name too long, or
352 	 *                      name already defined in the RRD
353 	 * @throws IOException  Thrown in case of I/O error
354 	 */
355 	public void setDsName(String newDsName) throws RrdException, IOException {
356 		if (newDsName.length() > RrdString.STRING_LENGTH) {
357 			throw new RrdException("Invalid datasource name specified: " + newDsName);
358 		}
359 		if (parentDb.containsDs(newDsName)) {
360 			throw new RrdException("Datasource already defined in this RRD: " + newDsName);
361 		}
362 		this.dsName.set(newDsName);
363 	}
364 
365 	public void setDsType(String newDsType) throws RrdException, IOException {
366 		if (!DsDef.isValidDsType(newDsType)) {
367 			throw new RrdException("Invalid datasource type: " + newDsType);
368 		}
369 		// set datasource type
370 		this.dsType.set(newDsType);
371 		// reset datasource status
372 		lastValue.set(Double.NaN);
373 		accumValue.set(0.0);
374 		// reset archive status
375 		int dsIndex = parentDb.getDsIndex(dsName.get());
376 		Archive[] archives = parentDb.getArchives();
377 		for (Archive archive : archives) {
378 			archive.getArcState(dsIndex).setAccumValue(Double.NaN);
379 		}
380 	}
381 
382 	/**
383 	 * Sets minimum allowed value for this datasource. If <code>filterArchivedValues</code>
384 	 * argment is set to true, all archived values less then <code>minValue</code> will
385 	 * be fixed to NaN.
386 	 *
387 	 * @param minValue			 New minimal value. Specify <code>Double.NaN</code> if no minimal
388 	 *                             value should be set
389 	 * @param filterArchivedValues true, if archived datasource values should be fixed;
390 	 *                             false, otherwise.
391 	 * @throws IOException  Thrown in case of I/O error
392 	 * @throws RrdException Thrown if invalid minValue was supplied (not less then maxValue)
393 	 */
394 	public void setMinValue(double minValue, boolean filterArchivedValues)
395 			throws IOException, RrdException {
396 		double maxValue = this.maxValue.get();
397 		if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
398 			throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
399 		}
400 		this.minValue.set(minValue);
401 		if (!Double.isNaN(minValue) && filterArchivedValues) {
402 			int dsIndex = getDsIndex();
403 			Archive[] archives = parentDb.getArchives();
404 			for (Archive archive : archives) {
405 				archive.getRobin(dsIndex).filterValues(minValue, Double.NaN);
406 			}
407 		}
408 	}
409 
410 	/**
411 	 * Sets maximum allowed value for this datasource. If <code>filterArchivedValues</code>
412 	 * argment is set to true, all archived values greater then <code>maxValue</code> will
413 	 * be fixed to NaN.
414 	 *
415 	 * @param maxValue			 New maximal value. Specify <code>Double.NaN</code> if no max
416 	 *                             value should be set.
417 	 * @param filterArchivedValues true, if archived datasource values should be fixed;
418 	 *                             false, otherwise.
419 	 * @throws IOException  Thrown in case of I/O error
420 	 * @throws RrdException Thrown if invalid maxValue was supplied (not greater then minValue)
421 	 */
422 	public void setMaxValue(double maxValue, boolean filterArchivedValues)
423 			throws IOException, RrdException {
424 		double minValue = this.minValue.get();
425 		if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
426 			throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
427 		}
428 		this.maxValue.set(maxValue);
429 		if (!Double.isNaN(maxValue) && filterArchivedValues) {
430 			int dsIndex = getDsIndex();
431 			Archive[] archives = parentDb.getArchives();
432 			for (Archive archive : archives) {
433 				archive.getRobin(dsIndex).filterValues(Double.NaN, maxValue);
434 			}
435 		}
436 	}
437 
438 	/**
439 	 * Sets min/max values allowed for this datasource. If <code>filterArchivedValues</code>
440 	 * argment is set to true, all archived values less then <code>minValue</code> or
441 	 * greater then <code>maxValue</code> will be fixed to NaN.
442 	 *
443 	 * @param minValue			 New minimal value. Specify <code>Double.NaN</code> if no min
444 	 *                             value should be set.
445 	 * @param maxValue			 New maximal value. Specify <code>Double.NaN</code> if no max
446 	 *                             value should be set.
447 	 * @param filterArchivedValues true, if archived datasource values should be fixed;
448 	 *                             false, otherwise.
449 	 * @throws IOException  Thrown in case of I/O error
450 	 * @throws RrdException Thrown if invalid min/max values were supplied
451 	 */
452 	public void setMinMaxValue(double minValue, double maxValue, boolean filterArchivedValues)
453 			throws IOException, RrdException {
454 		if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
455 			throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
456 		}
457 		this.minValue.set(minValue);
458 		this.maxValue.set(maxValue);
459 		if (!(Double.isNaN(minValue) && Double.isNaN(maxValue)) && filterArchivedValues) {
460 			int dsIndex = getDsIndex();
461 			Archive[] archives = parentDb.getArchives();
462 			for (Archive archive : archives) {
463 				archive.getRobin(dsIndex).filterValues(minValue, maxValue);
464 			}
465 		}
466 	}
467 
468 	/**
469 	 * Returns the underlying storage (backend) object which actually performs all
470 	 * I/O operations.
471 	 *
472 	 * @return I/O backend object
473 	 */
474 	public RrdBackend getRrdBackend() {
475 		return parentDb.getRrdBackend();
476 	}
477 
478 	/**
479 	 * Required to implement RrdUpdater interface. You should never call this method directly.
480 	 *
481 	 * @return Allocator object
482 	 */
483 	public RrdAllocator getRrdAllocator() {
484 		return parentDb.getRrdAllocator();
485 	}
486 }
487