XRootD
XrdOfsCksFile.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O f s C k s F i l e . c c */
4 /* */
5 /* (c) 2026 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <stdlib.h>
32 #include <netinet/in.h>
33 #include <sys/param.h>
34 
35 #include "XrdCks/XrdCks.hh"
36 #include "XrdCks/XrdCksCalc.hh"
37 #include "XrdCks/XrdCksData.hh"
38 #include "XrdOfs/XrdOfsCksFile.hh"
39 #include "XrdOuc/XrdOucEnv.hh"
40 #include "XrdSfs/XrdSfsAio.hh"
41 #include "XrdSys/XrdSysError.hh"
42 
43 /******************************************************************************/
44 /* s t a t i c o b j e c t s */
45 /******************************************************************************/
46 
47 extern XrdSysError OfsEroute;
48 
49 namespace
50 {
52  XrdCks* cksP = 0;
53 }
54 
55 /******************************************************************************/
56 /* C o n s t r u c t o r */
57 /******************************************************************************/
58 
59 XrdOfsCksFile::XrdOfsCksFile(const char* tid, const char* path, XrdOssDF* df,
60  XrdCksCalc* cP, bool& delF)
61  : XrdOssWrapDF(*df),
62  tident(tid), fPath(strdup(path)), ossDF(df),
63  calcP(cP), altcP(0), viaDel(delF), nextOff(0),
64  ioBuff(0)
65 {
66 // Obtain information about the chacksum we are to use. It should have been
67 // pre-screened for viability, but we check it again just to make sure and
68 // to setup the proper execution path.
69 //
70  int rc, sz;
71  char eBuff[128];
72 
73 // Set the name and get the size of the checksum
74 //
75  cksName = cP->Type(sz);
76  Dirty = false;
77 
78 
79 // Determine how we will compute the real-time checksum. It will either be
80 // either combinable via a re-read computation.
81 //
82  if (cP->Combinable() && (sz == (int)sizeof(uint32_t)))
83  {ProcessRTC = &XrdOfsCksFile::RTC_CB32;
84  altcP = calcP->New();
85  } else {
86  ProcessRTC = &XrdOfsCksFile::RTC_NCXX;
87  if ((rc = posix_memalign(&ioBuff, 4096, ioBlen)))
88  {snprintf(eBuff, sizeof(eBuff),
89  "get buffer for real-time %s checksum for", cksName);
90  eLog.Emsg("ckscon", rc, eBuff, path);
91  Dirty = true;
92  }
93  }
94 }
95 
96 /******************************************************************************/
97 /* D e s t r u c t o r */
98 /******************************************************************************/
99 
101 {
102 // Cleanup
103 //
104  if (ossDF) delete ossDF;
105  if (calcP) calcP->Recycle();
106  if (altcP) altcP->Recycle();
107  if (fPath) free(fPath);
108  if (ioBuff) free(ioBuff);
109 }
110 
111 /******************************************************************************/
112 /* c l o s e */
113 /******************************************************************************/
114 
115 /*
116  Function: Close the file associated with this object.
117 
118  Input: None.
119 
120  Output: Returns XrdOssOK upon success and -1 upon failure.
121 */
122 int XrdOfsCksFile::Close(long long *retsz)
123 {
124  XrdCksData cksData;
125  struct stat Stat;
126  int csSize, rc;;
127 
128 // Process checksum if it is valid
129 //
130  cksMtx.Lock();
131  while(!Dirty) // This is not a loop but avoids deeply next if's.
132  {char eBuff[256];
133 
134  // If we are here vecause of a delete, skip setting checksum
135  //
136  if (viaDel)
137  {snprintf(eBuff, sizeof(eBuff), "File not properly closed; "
138  "real-time %s checksum was not set for", cksName);
139  eLog.Emsg("ckscls", eBuff, fPath);
140  break;
141  }
142 
143  // Verify that checksum was fully calculated
144  //
145 
146  // Verify that all data has been written for this checksum
147  //
148  if (segMap.size())
149  {auto it = segMap.begin();
150  snprintf(eBuff, sizeof(eBuff),
151  "%lld bytes missing at offset %lld; real-time %s",
152  (long long)(it->second.segBeg - nextOff),
153  (long long)nextOff, cksName);
154  eLog.Emsg("ckcls", eBuff, "checksum was not set for", fPath);
155  break;
156  }
157 
158  // Fill out the checksum information
159  //
160  memset((void*)&cksData, 0, sizeof(cksData));
161  cksData.Set(calcP->Type(csSize));
162  cksData.Length = csSize;
163  memcpy(cksData.Value, calcP->Final(), csSize);
164 
165  if ((rc = ossDF->Fstat(&Stat)))
166  {eLog.Emsg("clscls", rc, "get real-time checksum mtime for", fPath);
167  break;
168  }
169 
170  cksData.fmTime = static_cast<long long>(Stat.st_mtime);
171  cksData.csTime = static_cast<int>(time(0) - Stat.st_mtime);
172 
173  if ((rc = cksP->Set(fPath, cksData, 1)))
174  eLog.Emsg("ckscls", rc, "set real-time checksum for", fPath);
175 
176  break;
177  }
178 
179 // Issue close to the underlying object
180 //
181  Dirty = true; // Prevent re-entry processing
182  cksMtx.UnLock();
183  return wrapDF.Close(retsz);
184 }
185 
186 /******************************************************************************/
187 /* F t r u n c a t e */
188 /******************************************************************************/
189 /*
190  Function: Set the length of associated file to 'flen'.
191 
192  Input: flen - The new size of the file.
193 
194  Output: Returns XrdOssOK upon success and -errno upon failure.
195 */
196 int XrdOfsCksFile::Ftruncate(unsigned long long flen)
197 {
198 
199 // Execute the truncate
200 //
201  int rc = wrapDF.Ftruncate(flen);
202 
203 // We support streaming checksum only when the truncate makes the file 0 length
204 //
205  if (!Dirty)
206  {if (rc < 0)
207  {eLog.Emsg("ckstrunc", rc, "continue real-time checksum for", fPath);
208  Dirty = true;
209  } else {
210  if (flen)
211  {eLog.Emsg("ckstrunc","Unable to continue real-time checksum for",
212  fPath, "; truncate arg not 0.");
213  Dirty = true;
214  } else {
215  nextOff = 0;
216  segMap.clear();
217  calcP->Init();
218  }
219  }
220  }
221 
222 // All done
223 //
224  return rc;
225 }
226 
227 /******************************************************************************/
228 /* I n i t */
229 /******************************************************************************/
230 
232 {
233 // Record the checksum manager
234 //
235  cksP = cp;
236 }
237 
238 /******************************************************************************/
239 /* O p e n */
240 /******************************************************************************/
241 
242 int XrdOfsCksFile::Open(const char* path, int Oflag, mode_t Mode,
243  XrdOucEnv& env)
244 {
245 // Make sure we have a clean setup. If not return an error.
246 //
247  if (Dirty) return -ENOTSUP;
248 
249 // We intercept open because for non-combinable checksums we need to make
250 // sure the file is open in r/w mode. Since we don't support such checksums
251 // yet, the code below is commented out.
252 //
253 /* if (!(calcp->Combinable()))
254  {Oflag &= ~O_ACCMODE;
255  Oflag |= O_RDWR;
256  }
257 */
258 // Issue open and if unsuccessful, mark this as a dirty object
259 //
260  int rc = wrapDF.Open(path, Oflag, Mode, env);
261  if (rc) Dirty = true;
262  return rc;
263 }
264 
265 /******************************************************************************/
266 /* p g W r i t e */
267 /******************************************************************************/
268 
269 ssize_t XrdOfsCksFile::pgWrite(void* buffer,
270  off_t offset,
271  size_t wrlen,
272  uint32_t* csvec,
273  uint64_t opts)
274 {
275  const char* eText;
276 
277 // We will only continue the checksum if the underlying write succeeds
278 //
279  ssize_t retval = wrapDF.pgWrite(buffer, offset, wrlen, csvec, opts);
280 
281 // Continue the streaming checksum if at all possible
282 //
283  if (!Dirty)
284  {if (retval < 0)
285  {eLog.Emsg("ckspgw", retval, "continue real-time checksum for",fPath);
286  Dirty = true;
287  } else {
288  if ((eText = (this->*ProcessRTC)(buffer, offset, wrlen)))
289  {eLog.Emsg("ckspgw","unable to continue real-time checksum for",
290  fPath, eText);
291  }
292  }
293  }
294 
295 // Return actual result
296 //
297  return retval;
298 }
299 
300 /******************************************************************************/
301 
302 int XrdOfsCksFile::pgWrite(XrdSfsAio* aioparm, uint64_t opts)
303 {
304  const char* eText;
305 
306 // It is too complicated to do the async I/O before doing the checksum
307 //
308  if (!Dirty)
309  {if ((eText = (this->*ProcessRTC)((void *)aioparm->sfsAio.aio_buf,
310  (off_t) aioparm->sfsAio.aio_offset,
311  (size_t)aioparm->sfsAio.aio_nbytes)))
312  {eLog.Emsg("cksaiopw", "Unable to continue real-time checksum for",
313  fPath, eText);
314  Dirty = true;
315  }
316  }
317 
318 // Now do the I/O
319 //
320  return wrapDF.pgWrite(aioparm, opts);
321 }
322 
323 /******************************************************************************/
324 /* w r i t e */
325 /******************************************************************************/
326 
327 /*
328  Function: Write `blen' bytes to the associated file, from 'buff'
329  and return the actual number of bytes written.
330 
331  Input: buff - Address of the buffer from which to get the data.
332  offset - The absolute 64-bit byte offset at which to write.
333  blen - The number of bytes to write from the buffer.
334 
335  Output: Returns the number of bytes written upon success and -errno o/w.
336 */
337 
338 ssize_t XrdOfsCksFile::Write(const void* buff, off_t offset, size_t blen)
339 {
340  const char* eText;
341 
342 // We will only continue the checksum if the underlying write succeeds
343 //
344  ssize_t retval = wrapDF.Write(buff, offset, blen);
345 
346 // Continue the streaming checksum if at all possible
347 //
348  if (!Dirty)
349  {if (retval < 0)
350  {eLog.Emsg("cksw", retval, "continue streaming checksum for", fPath);
351  Dirty = true;
352  } else {
353  if ((eText = (this->*ProcessRTC)(buff, offset, blen)))
354  {eLog.Emsg("cksw", "Unable to continue real-time checksum for",
355  fPath, eText);
356  Dirty = true;
357  }
358  }
359  }
360 
361 // Return actual result
362 //
363  return retval;
364 }
365 
366 /******************************************************************************/
367 
369 {
370  const char* eText;
371 
372 // It is too complicated to do the async I/O before doing the checksum
373 //
374  if (!Dirty)
375  {if ((eText = (this->*ProcessRTC)((void *)aioparm->sfsAio.aio_buf,
376  (off_t) aioparm->sfsAio.aio_offset,
377  (size_t)aioparm->sfsAio.aio_nbytes)))
378  {eLog.Emsg("cksaiopw", "Unable to continue real-time checksum for",
379  fPath, eText);
380  Dirty = true;
381  }
382  }
383 
384 // Now do the I/O
385 //
386  return wrapDF.Write(aioparm);
387 }
388 
389 /******************************************************************************/
390 /* W r i t e V */
391 /******************************************************************************/
392 
393 ssize_t XrdOfsCksFile::WriteV(XrdOucIOVec* writeV, int n)
394 {
395 
396 // We do not support streaming checksums when WriteV is used
397 //
398  if (!Dirty)
399  {eLog.Emsg("ckswv", "Unable to continue streaming checksum for",
400  fPath, "; WriteV() conflict.");
401  Dirty = true;
402  }
403 
404 // We still handle the actual write
405 //
406  return wrapDF.WriteV(writeV, n);
407 }
408 
409 /******************************************************************************/
410 /* P r i v a t e M e t h o d s */
411 /******************************************************************************/
412 /******************************************************************************/
413 /* R T C _ C B 3 2 */
414 /******************************************************************************/
415 
416 // This method handles combinable checkums that are 32 bits in length
417 //
418 const char* XrdOfsCksFile::RTC_CB32(const void* inBuff, off_t inOff, int inLen)
419 {
420  XrdSysMutexHelper mHelp(cksMtx);
421 
422 // Check where the incomming segment is adjacent to current segment
423 //
424  if (inOff == nextOff)
425  {calcP->Update((const char*)inBuff, inLen);
426  nextOff = inOff + inLen;
427  auto it = segMap.begin();
428  while(it != segMap.end() && nextOff == it->second.segBeg)
429  {// Combine or update base checksum
430  calcP->Combine((const char*)&(it->second.segCks),it->second.segLen);
431  nextOff = it->second.segBeg + it->second.segLen;
432  it = segMap.erase(it);
433  }
434 
435  // Verify that we end in a proper state
436  //
437  if (it != segMap.end() && nextOff > it->second.segBeg)
438  return "; I/O segments overlap";
439 
440  return 0;
441  }
442 
443 // Verify that incomming segment is past the expectected segment
444 //
445  if (inOff < nextOff) return "; ovewrite of previous data";
446 
447 // Compute checksum for the incomming block
448 //
449  uint32_t theCS;
450  memcpy(&theCS, altcP->Calc((const char*)inBuff, inLen), sizeof(theCS));
451 
452 // Create new segment and try inserting it into the map
453 //
454  inSeg newSeg(inOff, inLen, theCS);
455 
456 // Insert this element into the map
457 //
458  auto it = segMap.insert(std::pair(inOff, newSeg));
459  if (it.second == false)
460  return "; duplicate write";
461 
462 // All done
463 //
464  return 0;
465 }
466 
467 /******************************************************************************/
468 /* R T C _ N C X X */
469 /******************************************************************************/
470 
471 // This method handles noncombinable checkums of any legth.
472 //
473 const char* XrdOfsCksFile::RTC_NCXX(const void* inBuff, off_t inOff, int inLen)
474 {
475  XrdSysMutexHelper mHelp(cksMtx);
476 
477 // Check where the incomming segment is adjacent to current segment
478 //
479  if (inOff == nextOff)
480  {const char* eText;
481  calcP->Update((const char*)inBuff, inLen);
482  nextOff = inOff + inLen;
483  auto it = segMap.begin();
484  while(it != segMap.end() && nextOff == it->second.segBeg)
485  {// Update base checksum for subsequent blocks
486  if ((eText = RTC_Updt(it->second.segBeg, it->second.segLen)))
487  return eText;
488  nextOff = it->second.segBeg + it->second.segLen;
489  it = segMap.erase(it);
490  }
491 
492  // Verify that we end in a proper state
493  //
494  if (it != segMap.end() && nextOff > it->second.segBeg)
495  return "; I/O segments overlap";
496 
497  return 0;
498  }
499 
500 // Verify that incomming segment is past the expectected segment
501 //
502  if (inOff < nextOff) return "; ovewrite of previous data";
503 
504 // Create new segment and try inserting it into the map
505 //
506  inSeg newSeg(inOff, inLen, 0);
507 
508 // Insert this element into the map
509 //
510  auto it = segMap.insert(std::pair(inOff, newSeg));
511  if (it.second == false)
512  return "; duplicate write";
513 
514 // All done
515 //
516  return 0;
517 }
518 
519 /******************************************************************************/
520 /* R T C _ U p d t */
521 /******************************************************************************/
522 
523 const char* XrdOfsCksFile::RTC_Updt(off_t inOff, int inLen)
524 {
525  ssize_t ioLen, retval;
526 
527  while(inLen)
528  {ioLen = (inLen > ioBlen ? ioBlen : inLen);
529  if ((retval = ossDF->Read(ioBuff, inOff, ioLen)) != ioLen)
530  {if (retval >= 0) retval = -ENODATA;
531  return eLog.ec2text(retval);
532  }
533  calcP->Update((const char*)ioBuff, ioLen);
534  inOff += ioLen;
535  inLen -= ioLen;
536  }
537  return 0;
538 }
#define tident
struct stat Stat
Definition: XrdCks.cc:49
#define ENODATA
Definition: XrdOfsChkPnt.cc:48
XrdSysError OfsEroute
#define stat(a, b)
Definition: XrdPosix.hh:105
int Mode
struct myOpts opts
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
virtual char * Final()=0
virtual bool Combinable()
Definition: XrdCksCalc.hh:67
virtual void Update(const char *Buff, int BLen)=0
virtual char * Calc(const char *Buff, int BLen)
Definition: XrdCksCalc.hh:54
virtual const char * Combine(const char *Cksum, int DLen)
Definition: XrdCksCalc.hh:85
virtual const char * Type(int &csSize)=0
virtual void Recycle()
Recycle the checksum object as it is no longer needed. A default is given.
Definition: XrdCksCalc.hh:145
virtual void Init()=0
virtual XrdCksCalc * New()=0
Definition: XrdCks.hh:92
int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) override
ssize_t Write(const void *buffer, off_t offset, size_t size) override
ssize_t WriteV(XrdOucIOVec *writeV, int wrvcnt) override
static void Init(XrdCks *cp, XrdOucEnv *ep)
XrdOfsCksFile(const char *tid, const char *path, XrdOssDF *df, XrdCksCalc *cP, bool &delFlag)
int Close(long long *retsz=0) override
ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts) override
int Ftruncate(unsigned long long flen) override
virtual ~XrdOfsCksFile()
virtual ssize_t WriteV(XrdOucIOVec *writeV, int wrvcnt)
Definition: XrdOss.cc:273
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:192
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:164
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:228
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:310
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:385
XrdOssDF & wrapDF
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
static const char * ec2text(int ecode)
Definition: XrdSysError.cc:80
XrdSysError * eLog