1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package org.jrobin.core;
27
28 import java.io.IOException;
29
30
31
32
33
34
35
36
37
38
39
40
41 public class Datasource implements RrdUpdater, DsTypes {
42 private static final double MAX_32_BIT = Math.pow(2, 32);
43 private static final double MAX_64_BIT = Math.pow(2, 64);
44
45 private RrdDb parentDb;
46
47 private RrdString dsName, dsType;
48 private RrdLong heartbeat;
49 private RrdDouble minValue, maxValue;
50
51
52 private RrdDouble lastValue;
53 private RrdLong nanSeconds;
54 private RrdDouble accumValue;
55
56 Datasource(RrdDb parentDb, DsDef dsDef) throws IOException {
57 boolean shouldInitialize = dsDef != null;
58 this.parentDb = parentDb;
59 dsName = new RrdString(this);
60 dsType = new RrdString(this);
61 heartbeat = new RrdLong(this);
62 minValue = new RrdDouble(this);
63 maxValue = new RrdDouble(this);
64 lastValue = new RrdDouble(this);
65 accumValue = new RrdDouble(this);
66 nanSeconds = new RrdLong(this);
67 if (shouldInitialize) {
68 dsName.set(dsDef.getDsName());
69 dsType.set(dsDef.getDsType());
70 heartbeat.set(dsDef.getHeartbeat());
71 minValue.set(dsDef.getMinValue());
72 maxValue.set(dsDef.getMaxValue());
73 lastValue.set(Double.NaN);
74 accumValue.set(0.0);
75 Header header = parentDb.getHeader();
76 nanSeconds.set(header.getLastUpdateTime() % header.getStep());
77 }
78 }
79
80 Datasource(RrdDb parentDb, DataImporter reader, int dsIndex) throws IOException, RrdException {
81 this(parentDb, null);
82 dsName.set(reader.getDsName(dsIndex));
83 dsType.set(reader.getDsType(dsIndex));
84 heartbeat.set(reader.getHeartbeat(dsIndex));
85 minValue.set(reader.getMinValue(dsIndex));
86 maxValue.set(reader.getMaxValue(dsIndex));
87 lastValue.set(reader.getLastValue(dsIndex));
88 accumValue.set(reader.getAccumValue(dsIndex));
89 nanSeconds.set(reader.getNanSeconds(dsIndex));
90 }
91
92 String dump() throws IOException {
93 return "== DATASOURCE ==\n" +
94 "DS:" + dsName.get() + ":" + dsType.get() + ":" +
95 heartbeat.get() + ":" + minValue.get() + ":" +
96 maxValue.get() + "\nlastValue:" + lastValue.get() +
97 " nanSeconds:" + nanSeconds.get() +
98 " accumValue:" + accumValue.get() + "\n";
99 }
100
101
102
103
104
105
106
107 public String getDsName() throws IOException {
108 return dsName.get();
109 }
110
111
112
113
114
115
116
117 public String getDsType() throws IOException {
118 return dsType.get();
119 }
120
121
122
123
124
125
126
127
128 public long getHeartbeat() throws IOException {
129 return heartbeat.get();
130 }
131
132
133
134
135
136
137
138 public double getMinValue() throws IOException {
139 return minValue.get();
140 }
141
142
143
144
145
146
147
148 public double getMaxValue() throws IOException {
149 return maxValue.get();
150 }
151
152
153
154
155
156
157
158 public double getLastValue() throws IOException {
159 return lastValue.get();
160 }
161
162
163
164
165
166
167
168 public double getAccumValue() throws IOException {
169 return accumValue.get();
170 }
171
172
173
174
175
176
177
178 public long getNanSeconds() throws IOException {
179 return nanSeconds.get();
180 }
181
182 void process(long newTime, double newValue) throws IOException, RrdException {
183 Header header = parentDb.getHeader();
184 long step = header.getStep();
185 long oldTime = header.getLastUpdateTime();
186 long startTime = Util.normalize(oldTime, step);
187 long endTime = startTime + step;
188 double oldValue = lastValue.get();
189 double updateValue = calculateUpdateValue(oldTime, oldValue, newTime, newValue);
190 if (newTime < endTime) {
191 accumulate(oldTime, newTime, updateValue);
192 }
193 else {
194
195 long boundaryTime = Util.normalize(newTime, step);
196 accumulate(oldTime, boundaryTime, updateValue);
197 double value = calculateTotal(startTime, boundaryTime);
198
199 long numSteps = (boundaryTime - endTime) / step + 1L;
200
201 parentDb.archive(this, value, numSteps);
202
203 nanSeconds.set(0);
204 accumValue.set(0.0);
205 accumulate(boundaryTime, newTime, updateValue);
206 }
207 }
208
209 private double calculateUpdateValue(long oldTime, double oldValue,
210 long newTime, double newValue) throws IOException {
211 double updateValue = Double.NaN;
212 if (newTime - oldTime <= heartbeat.get()) {
213 String type = dsType.get();
214 if (type.equals(DT_GAUGE)) {
215 updateValue = newValue;
216 }
217 else if (type.equals(DT_ABSOLUTE)) {
218 if (!Double.isNaN(newValue)) {
219 updateValue = newValue / (newTime - oldTime);
220 }
221 }
222 else if (type.equals(DT_DERIVE)) {
223 if (!Double.isNaN(newValue) && !Double.isNaN(oldValue)) {
224 updateValue = (newValue - oldValue) / (newTime - oldTime);
225 }
226 }
227 else if (type.equals(DT_COUNTER)) {
228 if (!Double.isNaN(newValue) && !Double.isNaN(oldValue)) {
229 double diff = newValue - oldValue;
230 if (diff < 0) {
231 diff += MAX_32_BIT;
232 }
233 if (diff < 0) {
234 diff += MAX_64_BIT - MAX_32_BIT;
235 }
236 if (diff >= 0) {
237 updateValue = diff / (newTime - oldTime);
238 }
239 }
240 }
241 if (!Double.isNaN(updateValue)) {
242 double minVal = minValue.get();
243 double maxVal = maxValue.get();
244 if (!Double.isNaN(minVal) && updateValue < minVal) {
245 updateValue = Double.NaN;
246 }
247 if (!Double.isNaN(maxVal) && updateValue > maxVal) {
248 updateValue = Double.NaN;
249 }
250 }
251 }
252 lastValue.set(newValue);
253 return updateValue;
254 }
255
256 private void accumulate(long oldTime, long newTime, double updateValue) throws IOException {
257 if (Double.isNaN(updateValue)) {
258 nanSeconds.set(nanSeconds.get() + (newTime - oldTime));
259 }
260 else {
261 accumValue.set(accumValue.get() + updateValue * (newTime - oldTime));
262 }
263 }
264
265 private double calculateTotal(long startTime, long boundaryTime) throws IOException {
266 double totalValue = Double.NaN;
267 long validSeconds = boundaryTime - startTime - nanSeconds.get();
268 if (nanSeconds.get() <= heartbeat.get() && validSeconds > 0) {
269 totalValue = accumValue.get() / validSeconds;
270 }
271
272
273
274 if (Double.isNaN(totalValue) && dsName.get().endsWith(DsDef.FORCE_ZEROS_FOR_NANS_SUFFIX)) {
275 totalValue = 0D;
276 }
277 return totalValue;
278 }
279
280 void appendXml(XmlWriter writer) throws IOException {
281 writer.startTag("ds");
282 writer.writeTag("name", dsName.get());
283 writer.writeTag("type", dsType.get());
284 writer.writeTag("minimal_heartbeat", heartbeat.get());
285 writer.writeTag("min", minValue.get());
286 writer.writeTag("max", maxValue.get());
287 writer.writeComment("PDP Status");
288 writer.writeTag("last_ds", lastValue.get(), "UNKN");
289 writer.writeTag("value", accumValue.get());
290 writer.writeTag("unknown_sec", nanSeconds.get());
291 writer.closeTag();
292 }
293
294
295
296
297
298
299
300
301 public void copyStateTo(RrdUpdater other) throws IOException, RrdException {
302 if (!(other instanceof Datasource)) {
303 throw new RrdException(
304 "Cannot copy Datasource object to " + other.getClass().getName());
305 }
306 Datasource datasource = (Datasource) other;
307 if (!datasource.dsName.get().equals(dsName.get())) {
308 throw new RrdException("Incomaptible datasource names");
309 }
310 if (!datasource.dsType.get().equals(dsType.get())) {
311 throw new RrdException("Incomaptible datasource types");
312 }
313 datasource.lastValue.set(lastValue.get());
314 datasource.nanSeconds.set(nanSeconds.get());
315 datasource.accumValue.set(accumValue.get());
316 }
317
318
319
320
321
322
323
324 public int getDsIndex() throws IOException {
325 try {
326 return parentDb.getDsIndex(dsName.get());
327 }
328 catch (RrdException e) {
329 return -1;
330 }
331 }
332
333
334
335
336
337
338
339
340 public void setHeartbeat(long heartbeat) throws RrdException, IOException {
341 if (heartbeat < 1L) {
342 throw new RrdException("Invalid heartbeat specified: " + heartbeat);
343 }
344 this.heartbeat.set(heartbeat);
345 }
346
347
348
349
350
351
352
353
354
355 public void setDsName(String newDsName) throws RrdException, IOException {
356 if (newDsName.length() > RrdString.STRING_LENGTH) {
357 throw new RrdException("Invalid datasource name specified: " + newDsName);
358 }
359 if (parentDb.containsDs(newDsName)) {
360 throw new RrdException("Datasource already defined in this RRD: " + newDsName);
361 }
362 this.dsName.set(newDsName);
363 }
364
365 public void setDsType(String newDsType) throws RrdException, IOException {
366 if (!DsDef.isValidDsType(newDsType)) {
367 throw new RrdException("Invalid datasource type: " + newDsType);
368 }
369
370 this.dsType.set(newDsType);
371
372 lastValue.set(Double.NaN);
373 accumValue.set(0.0);
374
375 int dsIndex = parentDb.getDsIndex(dsName.get());
376 Archive[] archives = parentDb.getArchives();
377 for (Archive archive : archives) {
378 archive.getArcState(dsIndex).setAccumValue(Double.NaN);
379 }
380 }
381
382
383
384
385
386
387
388
389
390
391
392
393
394 public void setMinValue(double minValue, boolean filterArchivedValues)
395 throws IOException, RrdException {
396 double maxValue = this.maxValue.get();
397 if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
398 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
399 }
400 this.minValue.set(minValue);
401 if (!Double.isNaN(minValue) && filterArchivedValues) {
402 int dsIndex = getDsIndex();
403 Archive[] archives = parentDb.getArchives();
404 for (Archive archive : archives) {
405 archive.getRobin(dsIndex).filterValues(minValue, Double.NaN);
406 }
407 }
408 }
409
410
411
412
413
414
415
416
417
418
419
420
421
422 public void setMaxValue(double maxValue, boolean filterArchivedValues)
423 throws IOException, RrdException {
424 double minValue = this.minValue.get();
425 if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
426 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
427 }
428 this.maxValue.set(maxValue);
429 if (!Double.isNaN(maxValue) && filterArchivedValues) {
430 int dsIndex = getDsIndex();
431 Archive[] archives = parentDb.getArchives();
432 for (Archive archive : archives) {
433 archive.getRobin(dsIndex).filterValues(Double.NaN, maxValue);
434 }
435 }
436 }
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452 public void setMinMaxValue(double minValue, double maxValue, boolean filterArchivedValues)
453 throws IOException, RrdException {
454 if (!Double.isNaN(minValue) && !Double.isNaN(maxValue) && minValue >= maxValue) {
455 throw new RrdException("Invalid min/max values: " + minValue + "/" + maxValue);
456 }
457 this.minValue.set(minValue);
458 this.maxValue.set(maxValue);
459 if (!(Double.isNaN(minValue) && Double.isNaN(maxValue)) && filterArchivedValues) {
460 int dsIndex = getDsIndex();
461 Archive[] archives = parentDb.getArchives();
462 for (Archive archive : archives) {
463 archive.getRobin(dsIndex).filterValues(minValue, maxValue);
464 }
465 }
466 }
467
468
469
470
471
472
473
474 public RrdBackend getRrdBackend() {
475 return parentDb.getRrdBackend();
476 }
477
478
479
480
481
482
483 public RrdAllocator getRrdAllocator() {
484 return parentDb.getRrdAllocator();
485 }
486 }
487