XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfc.hh"
23#include "XrdPfcIO.hh"
24#include "XrdPfcTrace.hh"
25
27#include "XrdSys/XrdSysTimer.hh"
28#include "XrdOss/XrdOss.hh"
29#include "XrdOuc/XrdOucEnv.hh"
31
32#include <cstdio>
33#include <sstream>
34#include <fcntl.h>
35#include <cassert>
36
37
38using namespace XrdPfc;
39
40namespace
41{
42
43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44
45Cache* cache() { return &Cache::GetInstance(); }
46
47}
48
49const char *File::m_traceID = "File";
50
51//------------------------------------------------------------------------------
52
53File::File(const std::string& path, long long iOffset, long long iFileSize) :
54 m_ref_cnt(0),
55 m_data_file(0),
56 m_info_file(0),
57 m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58 m_filename(path),
59 m_offset(iOffset),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
62 m_ios_in_detach(0),
63 m_non_flushed_cnt(0),
64 m_in_sync(false),
65 m_detach_time_logged(false),
66 m_in_shutdown(false),
67 m_state_cond(0),
68 m_block_size(0),
69 m_num_blocks(0),
70 m_resmon_token(-1),
71 m_prefetch_state(kOff),
72 m_prefetch_bytes(0),
73 m_prefetch_read_cnt(0),
74 m_prefetch_hit_cnt(0),
75 m_prefetch_score(0)
76{}
77
78File::~File()
79{
80 TRACEF(Debug, "~File() for ");
81}
82
83void File::Close()
84{
85 // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
86 // A stat is called after close to re-check that m_stat_blocks have been reported correctly
87 // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
88 // in report_and_merge_delta_stats() below.
89 //
90 // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
91 // get reported in as stat.st_blocks.
92 // The reported number is correct in a stat immediately following a close.
93 // If one starts off by writing the last byte of the file, this pre-allocation does not get
94 // triggered up to that point. But comes back with a vengeance right after.
95 //
96 // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
97
98 if (m_info_file)
99 {
100 TRACEF(Debug, "Close() closing info-file ");
101 m_info_file->Close();
102 delete m_info_file;
103 m_info_file = nullptr;
104 }
105
106 if (m_data_file)
107 {
108 TRACEF(Debug, "Close() closing data-file ");
109 m_data_file->Close();
110 delete m_data_file;
111 m_data_file = nullptr;
112 }
113
114 if (m_resmon_token >= 0)
115 {
116 // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
117 // but in this case the file will get unlinked by the cache and reported as purge event.
118 // We check if the reported st_blocks so far is correct.
119 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
120 struct stat s;
121 int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
122 if (sr == 0 && s.st_blocks != m_st_blocks) {
123 Stats stats;
124 stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
125 m_st_blocks = s.st_blocks;
126 Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
127 }
128 }
129
130 Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
131 }
132
133 TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
134}
135
136//------------------------------------------------------------------------------
137
138File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
139{
140 File *file = new File(path, offset, fileSize);
141 if ( ! file->Open())
142 {
143 delete file;
144 file = 0;
145 }
146 return file;
147}
148
149//------------------------------------------------------------------------------
150
152{
153 // Called from Cache::Unlink() when the file is currently open.
154 // Cache::Unlink is also called on FSync error and when wrong number of bytes
155 // is received from a remote read.
156 //
157 // From this point onward the file will not be written to, cinfo file will
158 // not be updated, and all new read requests will return -ENOENT.
159 //
160 // File's entry in the Cache's active map is set to nullptr and will be
161 // removed from there shortly, in any case, well before this File object
162 // shuts down. Cache::Unlink() also reports the appropriate purge event.
163
164 XrdSysCondVarHelper _lck(m_state_cond);
165
166 m_in_shutdown = true;
167
168 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
169 {
170 m_prefetch_state = kStopped;
171 cache()->DeRegisterPrefetchFile(this);
172 }
173
174 report_and_merge_delta_stats();
175
176 return m_st_blocks;
177}
178
179//------------------------------------------------------------------------------
180
181void File::check_delta_stats()
182{
183 // Called under m_state_cond lock.
184 // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
185 if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
186 report_and_merge_delta_stats();
187}
188
189void File::report_and_merge_delta_stats()
190{
191 // Called under m_state_cond lock.
192 struct stat s;
193 m_data_file->Fstat(&s);
194 // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
195 // aggressive pre-allocation in this field (XFS, 4GB).
196 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
197 : m_file_size >> 9;
198 long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
199 m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
200 m_st_blocks = st_blocks_to_report;
201 Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
202 m_stats.AddUp(m_delta_stats);
203 m_delta_stats.Reset();
204}
205
206//------------------------------------------------------------------------------
207
209{
210 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
211
212 XrdSysCondVarHelper _lck(m_state_cond);
213 dec_ref_count(b);
214}
215
216void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
217{
218 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
219
220 XrdSysCondVarHelper _lck(m_state_cond);
221
222 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
223 {
224 dec_ref_count(*i);
225 }
226}
227
228//------------------------------------------------------------------------------
229
231{
232 std::string loc(io->GetLocation());
233 XrdSysCondVarHelper _lck(m_state_cond);
234 insert_remote_location(loc);
235}
236
237//------------------------------------------------------------------------------
238
240{
241 // Returns true if delay is needed.
242
243 TRACEF(Debug, "ioActive start for io " << io);
244
245 std::string loc(io->GetLocation());
246
247 {
248 XrdSysCondVarHelper _lck(m_state_cond);
249
250 IoSet_i mi = m_io_set.find(io);
251
252 if (mi != m_io_set.end())
253 {
254 unsigned int n_active_reads = io->m_active_read_reqs;
255
256 TRACE(Info, "ioActive for io " << io <<
257 ", active_reads " << n_active_reads <<
258 ", active_prefetches " << io->m_active_prefetches <<
259 ", allow_prefetching " << io->m_allow_prefetching <<
260 ", ios_in_detach " << m_ios_in_detach);
261 TRACEF(Info,
262 "\tio_map.size() " << m_io_set.size() <<
263 ", block_map.size() " << m_block_map.size() << ", file");
264
265 insert_remote_location(loc);
266
267 io->m_allow_prefetching = false;
268 io->m_in_detach = true;
269
270 // Check if any IO is still available for prfetching. If not, stop it.
271 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
272 {
273 if ( ! select_current_io_or_disable_prefetching(false) )
274 {
275 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
276 }
277 }
278
279 // On last IO, consider write queue blocks. Note, this also contains
280 // blocks being prefetched.
281
282 bool io_active_result;
283
284 if (n_active_reads > 0)
285 {
286 io_active_result = true;
287 }
288 else if (m_io_set.size() - m_ios_in_detach == 1)
289 {
290 io_active_result = ! m_block_map.empty();
291 }
292 else
293 {
294 io_active_result = io->m_active_prefetches > 0;
295 }
296
297 if ( ! io_active_result)
298 {
299 ++m_ios_in_detach;
300 }
301
302 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
303
304 return io_active_result;
305 }
306 else
307 {
308 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
309 return false;
310 }
311 }
312}
313
314//------------------------------------------------------------------------------
315
317{
318 XrdSysCondVarHelper _lck(m_state_cond);
319 m_detach_time_logged = false;
320}
321
323{
324 // Returns true if sync is required.
325 // This method is called after corresponding IO is detached from PosixCache.
326
327 XrdSysCondVarHelper _lck(m_state_cond);
328 if ( ! m_in_shutdown)
329 {
330 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
331 {
332 report_and_merge_delta_stats();
333 m_cfi.WriteIOStatDetach(m_stats);
334 m_detach_time_logged = true;
335 m_in_sync = true;
336 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
337 return true;
338 }
339 }
340 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
341 return false;
342}
343
344//------------------------------------------------------------------------------
345
347{
348 // Called from Cache::GetFile() when a new IO asks for the file.
349
350 TRACEF(Debug, "AddIO() io = " << (void*)io);
351
352 time_t now = time(0);
353 std::string loc(io->GetLocation());
354
355 m_state_cond.Lock();
356
357 IoSet_i mi = m_io_set.find(io);
358
359 if (mi == m_io_set.end())
360 {
361 m_io_set.insert(io);
362 io->m_attach_time = now;
363 m_delta_stats.IoAttach();
364
365 insert_remote_location(loc);
366
367 if (m_prefetch_state == kStopped)
368 {
369 m_prefetch_state = kOn;
370 cache()->RegisterPrefetchFile(this);
371 }
372 }
373 else
374 {
375 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
376 }
377
378 m_state_cond.UnLock();
379}
380
381//------------------------------------------------------------------------------
382
384{
385 // Called from Cache::ReleaseFile.
386
387 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
388
389 time_t now = time(0);
390
391 m_state_cond.Lock();
392
393 IoSet_i mi = m_io_set.find(io);
394
395 if (mi != m_io_set.end())
396 {
397 if (mi == m_current_io)
398 {
399 ++m_current_io;
400 }
401
402 m_delta_stats.IoDetach(now - io->m_attach_time);
403 m_io_set.erase(mi);
404 --m_ios_in_detach;
405
406 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
407 {
408 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
409 m_prefetch_state = kStopped;
410 cache()->DeRegisterPrefetchFile(this);
411 }
412 }
413 else
414 {
415 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
416 }
417
418 m_state_cond.UnLock();
419}
420
421//------------------------------------------------------------------------------
422
423bool File::Open()
424{
425 // Sets errno accordingly.
426
427 static const char *tpfx = "Open() ";
428
429 TRACEF(Dump, tpfx << "entered");
430
431 // Before touching anything, check with ResourceMonitor if a scan is in progress.
432 // This function will wait internally if needed until it is safe to proceed.
433 Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
434
436
437 XrdOss &myOss = * Cache::GetInstance().GetOss();
438 const char *myUser = conf.m_username.c_str();
439 XrdOucEnv myEnv;
440 struct stat data_stat, info_stat;
441
442 std::string ifn = m_filename + Info::s_infoExtension;
443
444 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
445 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
446
447 // Create the data file itself.
448 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
449 myEnv.Put("oss.asize", size_str);
450 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
451
452 int res;
453
454 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
455 {
456 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
457 errno = -res;
458 return false;
459 }
460
461 m_data_file = myOss.newFile(myUser);
462 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
463 {
464 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
465 errno = -res;
466 delete m_data_file; m_data_file = 0;
467 return false;
468 }
469
470 myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
471 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
472 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
473 {
474 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
475 errno = -res;
476 m_data_file->Close(); delete m_data_file; m_data_file = 0;
477 return false;
478 }
479
480 m_info_file = myOss.newFile(myUser);
481 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
482 {
483 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
484 errno = -res;
485 delete m_info_file; m_info_file = 0;
486 m_data_file->Close(); delete m_data_file; m_data_file = 0;
487 return false;
488 }
489
490 bool initialize_info_file = true;
491
492 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
493 {
494 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
495 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
496 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
497
498 // Check if data file exists and is of reasonable size.
499 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
500 {
501 initialize_info_file = false;
502 } else {
503 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
504 m_cfi.ResetAllAccessStats();
505 m_data_file->Ftruncate(0);
506 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
507 }
508 }
509
510 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
511 {
512 if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
513 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
514 {
515 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
516 initialize_info_file = true;
517 m_cfi.ResetAllAccessStats();
518 m_data_file->Ftruncate(0);
519 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
520 } else {
521 // TODO: If the file is complete, we don't need to reset net cksums.
522 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
523 }
524 }
525
526 if (initialize_info_file)
527 {
528 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
529 m_cfi.SetCkSumState(conf.get_cs_Chk());
530 m_cfi.ResetNoCkSumTime();
531 m_cfi.Write(m_info_file, ifn.c_str());
532 m_info_file->Fsync();
533 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
534 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
535 }
536 else
537 {
538 if (futimens(m_info_file->getFD(), NULL)) {
539 TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
540 }
541 }
542
543 m_cfi.WriteIOStatAttach();
544 m_state_cond.Lock();
545 m_block_size = m_cfi.GetBufferSize();
546 m_num_blocks = m_cfi.GetNBlocks();
547 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
548
549 m_data_file->Fstat(&data_stat);
550 m_st_blocks = data_stat.st_blocks;
551
552 m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
553 constexpr long long MB = 1024 * 1024;
554 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
555 // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
556 // actual threshold based on return values from register_file_update_stats().
557
558 m_state_cond.UnLock();
559
560 return true;
561}
562
563int File::Fstat(struct stat &sbuff)
564{
565 // Stat on an open file.
566 // Corrects size to actual full size of the file.
567 // Sets atime to 0 if the file is only partially downloaded, in accordance
568 // with pfc.onlyifcached settings.
569 // Called from IO::Fstat() and Cache::Stat() when the file is active.
570 // Returns 0 on success, -errno on error.
571
572 int res;
573
574 if ((res = m_data_file->Fstat(&sbuff))) return res;
575
576 sbuff.st_size = m_file_size;
577
578 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
579 if ( ! is_cached)
580 sbuff.st_atime = 0;
581
582 return 0;
583}
584
585//==============================================================================
586// Read and helpers
587//==============================================================================
588
589bool File::overlap(int blk, // block to query
590 long long blk_size, //
591 long long req_off, // offset of user request
592 int req_size, // size of user request
593 // output:
594 long long &off, // offset in user buffer
595 long long &blk_off, // offset in block
596 int &size) // size to copy
597{
598 const long long beg = blk * blk_size;
599 const long long end = beg + blk_size;
600 const long long req_end = req_off + req_size;
601
602 if (req_off < end && req_end > beg)
603 {
604 const long long ovlp_beg = std::max(beg, req_off);
605 const long long ovlp_end = std::min(end, req_end);
606
607 off = ovlp_beg - req_off;
608 blk_off = ovlp_beg - beg;
609 size = (int) (ovlp_end - ovlp_beg);
610
611 assert(size <= blk_size);
612 return true;
613 }
614 else
615 {
616 return false;
617 }
618}
619
620//------------------------------------------------------------------------------
621
622Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
623{
624 // Must be called w/ state_cond locked.
625 // Checks on size etc should be done before.
626 //
627 // Reference count is 0 so increase it in calling function if you want to
628 // catch the block while still in memory.
629
630 const long long off = i * m_block_size;
631 const int last_block = m_num_blocks - 1;
632 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
633
634 int blk_size, req_size;
635 if (i == last_block) {
636 blk_size = req_size = m_file_size - off;
637 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
638 } else {
639 blk_size = req_size = m_block_size;
640 }
641
642 Block *b = 0;
643 char *buf = cache()->RequestRAM(req_size);
644
645 if (buf)
646 {
647 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
648
649 if (b)
650 {
651 m_block_map[i] = b;
652
653 // Actual Read request is issued in ProcessBlockRequests().
654
655 if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
656 {
657 m_prefetch_state = kHold;
658 cache()->DeRegisterPrefetchFile(this);
659 }
660 }
661 else
662 {
663 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
664 }
665 }
666
667 return b;
668}
669
670void File::ProcessBlockRequest(Block *b)
671{
672 // This *must not* be called with block_map locked.
673
675
676 if (XRD_TRACE What >= TRACE_Dump) {
677 char buf[256];
678 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
679 b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
680 TRACEF(Dump, "ProcessBlockRequest() " << buf);
681 }
682
683 if (b->req_cksum_net())
684 {
685 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
686 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
687 } else {
688 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
689 }
690}
691
692void File::ProcessBlockRequests(BlockList_t& blks)
693{
694 // This *must not* be called with block_map locked.
695
696 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
697 {
698 ProcessBlockRequest(*bi);
699 }
700}
701
702//------------------------------------------------------------------------------
703
704void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
705{
706 int n_chunks = ioVec.size();
707 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
708
709 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
710 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
711
712 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
713
714 int pos = 0;
715 while (n_chunks > XrdProto::maxRvecsz) {
716 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
717 pos += XrdProto::maxRvecsz;
718 n_chunks -= XrdProto::maxRvecsz;
719 }
720 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
721}
722
723//------------------------------------------------------------------------------
724
725int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
726{
727 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
728
729 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
730
731 if (rs < 0)
732 {
733 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
734 return rs;
735 }
736
737 if (rs != expected_size)
738 {
739 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
740 return -EIO;
741 }
742
743 return (int) rs;
744}
745
746//------------------------------------------------------------------------------
747
748int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
749{
750 // rrc_func is ONLY called from async processing.
751 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
752 // This streamlines implementation of synchronous IO::Read().
753
754 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
755
756 m_state_cond.Lock();
757
758 if (m_in_shutdown || io->m_in_detach)
759 {
760 m_state_cond.UnLock();
761 return m_in_shutdown ? -ENOENT : -EBADF;
762 }
763
764 // Shortcut -- file is fully downloaded.
765
766 if (m_cfi.IsComplete())
767 {
768 m_state_cond.UnLock();
769 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
770 if (ret > 0) {
771 XrdSysCondVarHelper _lck(m_state_cond);
772 m_delta_stats.AddBytesHit(ret);
773 check_delta_stats();
774 }
775 return ret;
776 }
777
778 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
779
780 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
781}
782
783//------------------------------------------------------------------------------
784
785int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
786{
787 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
788
789 m_state_cond.Lock();
790
791 if (m_in_shutdown || io->m_in_detach)
792 {
793 m_state_cond.UnLock();
794 return m_in_shutdown ? -ENOENT : -EBADF;
795 }
796
797 // Shortcut -- file is fully downloaded.
798
799 if (m_cfi.IsComplete())
800 {
801 m_state_cond.UnLock();
802 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
803 if (ret > 0) {
804 XrdSysCondVarHelper _lck(m_state_cond);
805 m_delta_stats.AddBytesHit(ret);
806 check_delta_stats();
807 }
808 return ret;
809 }
810
811 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
812}
813
814//------------------------------------------------------------------------------
815
816int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
817 ReadReqRH *rh, const char *tpfx)
818{
819 // Non-trivial processing for Read and ReadV.
820 // Entered under lock.
821 //
822 // loop over reqired blocks:
823 // - if on disk, ok;
824 // - if in ram or incoming, inc ref-count
825 // - otherwise request and inc ref count (unless RAM full => request direct)
826 // unlock
827
828 int prefetch_cnt = 0;
829
830 ReadRequest *read_req = nullptr;
831 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
832
833 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
834
835 std::vector<XrdOucIOVec> iovec_disk;
836 std::vector<XrdOucIOVec> iovec_direct;
837 int iovec_disk_total = 0;
838 int iovec_direct_total = 0;
839
840 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
841 {
842 const XrdOucIOVec &iov = readV[iov_idx];
843 long long iUserOff = iov.offset;
844 int iUserSize = iov.size;
845 char *iUserBuff = iov.data;
846
847 const int idx_first = iUserOff / m_block_size;
848 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
849
850 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
851
852 enum LastBlock_e { LB_other, LB_disk, LB_direct };
853
854 LastBlock_e lbe = LB_other;
855
856 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
857 {
858 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
859 BlockMap_i bi = m_block_map.find(block_idx);
860
861 // overlap and read
862 long long off; // offset in user buffer
863 long long blk_off; // offset in block
864 int size; // size to copy
865
866 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
867
868 // In RAM or incoming?
869 if (bi != m_block_map.end())
870 {
871 inc_ref_count(bi->second);
872 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
873
874 if (bi->second->is_finished())
875 {
876 // note, blocks with error should not be here !!!
877 // they should be either removed or reissued in ProcessBlockResponse()
878 assert(bi->second->is_ok());
879
880 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
881
882 if (bi->second->m_prefetch)
883 ++prefetch_cnt;
884 }
885 else
886 {
887 if ( ! read_req)
888 read_req = new ReadRequest(io, rh);
889
890 // We have a lock on state_cond --> as we register the request before releasing the lock,
891 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
892
893 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
894 ++read_req->m_n_chunk_reqs;
895 }
896
897 lbe = LB_other;
898 }
899 // On disk?
900 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
901 {
902 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
903
904 if (lbe == LB_disk)
905 iovec_disk.back().size += size;
906 else
907 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
908 iovec_disk_total += size;
909
910 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
911 ++prefetch_cnt;
912
913 lbe = LB_disk;
914 }
915 // Neither ... then we have to go get it ...
916 else
917 {
918 if ( ! read_req)
919 read_req = new ReadRequest(io, rh);
920
921 // Is there room for one more RAM Block?
922 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
923 if (b)
924 {
925 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
926 inc_ref_count(b);
927 blks_to_request.push_back(b);
928
929 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
930 ++read_req->m_n_chunk_reqs;
931
932 lbe = LB_other;
933 }
934 else // Nope ... read this directly without caching.
935 {
936 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
937
938 iovec_direct_total += size;
939 read_req->m_direct_done = false;
940
941 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
942 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
943 // is determined in the RequestBlocksDirect().
944 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
945 iovec_direct.back().size += size;
946 } else {
947 long long in_offset = block_idx * m_block_size + blk_off;
948 char *out_pos = iUserBuff + off;
949 while (size > XrdProto::maxRVdsz) {
950 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
951 in_offset += XrdProto::maxRVdsz;
952 out_pos += XrdProto::maxRVdsz;
953 size -= XrdProto::maxRVdsz;
954 }
955 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
956 }
957
958 lbe = LB_direct;
959 }
960 }
961 } // end for over blocks in an IOVec
962 } // end for over readV IOVec
963
964 inc_prefetch_hit_cnt(prefetch_cnt);
965
966 m_state_cond.UnLock();
967
968 // First, send out remote requests for new blocks.
969 if ( ! blks_to_request.empty())
970 {
971 ProcessBlockRequests(blks_to_request);
972 blks_to_request.clear();
973 }
974
975 // Second, send out remote direct read requests.
976 if ( ! iovec_direct.empty())
977 {
978 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
979
980 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
981 }
982
983 // Begin synchronous part where we process data that is already in RAM or on disk.
984
985 long long bytes_read = 0;
986 int error_cond = 0; // to be set to -errno
987
988 // Third, process blocks that are available in RAM.
989 if ( ! blks_ready.empty())
990 {
991 for (auto &bvi : blks_ready)
992 {
993 for (auto &cr : bvi.second)
994 {
995 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
996 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
997 bytes_read += cr.m_size;
998 }
999 }
1000 }
1001
1002 // Fourth, read blocks from disk.
1003 if ( ! iovec_disk.empty())
1004 {
1005 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1006 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1007 if (rc >= 0)
1008 {
1009 bytes_read += rc;
1010 }
1011 else
1012 {
1013 error_cond = rc;
1014 TRACEF(Error, tpfx << "failed read from disk");
1015 }
1016 }
1017
1018 // End synchronous part -- update with sync stats and determine actual state of this read.
1019 // Note: remote reads might have already finished during disk-read!
1020
1021 m_state_cond.Lock();
1022
1023 for (auto &bvi : blks_ready)
1024 dec_ref_count(bvi.first, (int) bvi.second.size());
1025
1026 if (read_req)
1027 {
1028 read_req->m_bytes_read += bytes_read;
1029 if (error_cond)
1030 read_req->update_error_cond(error_cond);
1031 read_req->m_stats.m_BytesHit += bytes_read;
1032 read_req->m_sync_done = true;
1033
1034 if (read_req->is_complete())
1035 {
1036 // Almost like FinalizeReadRequest(read_req) -- but no callout!
1037 m_delta_stats.AddReadStats(read_req->m_stats);
1038 check_delta_stats();
1039 m_state_cond.UnLock();
1040
1041 int ret = read_req->return_value();
1042 delete read_req;
1043 return ret;
1044 }
1045 else
1046 {
1047 m_state_cond.UnLock();
1048 return -EWOULDBLOCK;
1049 }
1050 }
1051 else
1052 {
1053 m_delta_stats.m_BytesHit += bytes_read;
1054 check_delta_stats();
1055 m_state_cond.UnLock();
1056
1057 // !!! No callout.
1058
1059 return error_cond ? error_cond : bytes_read;
1060 }
1061}
1062
1063
1064//==============================================================================
1065// WriteBlock and Sync
1066//==============================================================================
1067
1069{
1070 // write block buffer into disk file
1071 long long offset = b->m_offset - m_offset;
1072 long long size = b->get_size();
1073 ssize_t retval;
1074
1075 if (m_cfi.IsCkSumCache())
1076 if (b->has_cksums())
1077 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1078 else
1079 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1080 else
1081 retval = m_data_file->Write(b->get_buff(), offset, size);
1082
1083 if (retval < size)
1084 {
1085 if (retval < 0) {
1086 TRACEF(Error, "WriteToDisk() write error " << retval);
1087 } else {
1088 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1089 }
1090
1091 XrdSysCondVarHelper _lck(m_state_cond);
1092
1093 dec_ref_count(b);
1094
1095 return;
1096 }
1097
1098 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1099
1100 // Set written bit.
1101 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1102
1103 bool schedule_sync = false;
1104 {
1105 XrdSysCondVarHelper _lck(m_state_cond);
1106
1107 m_cfi.SetBitWritten(blk_idx);
1108
1109 if (b->m_prefetch)
1110 {
1111 m_cfi.SetBitPrefetch(blk_idx);
1112 }
1113 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1114 {
1115 m_cfi.ResetCkSumNet();
1116 }
1117
1118 // Set synced bit or stash block index if in actual sync.
1119 // Synced state is only written out to cinfo file when data file is synced.
1120 if (m_in_sync)
1121 {
1122 m_writes_during_sync.push_back(blk_idx);
1123 }
1124 else
1125 {
1126 m_cfi.SetBitSynced(blk_idx);
1127 ++m_non_flushed_cnt;
1128 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1129 ! m_in_shutdown)
1130 {
1131 schedule_sync = true;
1132 m_in_sync = true;
1133 m_non_flushed_cnt = 0;
1134 }
1135 }
1136 // As soon as the reference count is decreased on the block, the
1137 // file object may be deleted. Thus, to avoid holding both locks at a time,
1138 // we defer the ref count decrease until later if a sync is needed
1139 if (!schedule_sync) {
1140 dec_ref_count(b);
1141 }
1142 }
1143
1144 if (schedule_sync)
1145 {
1146 cache()->ScheduleFileSync(this);
1147 XrdSysCondVarHelper _lck(m_state_cond);
1148 dec_ref_count(b);
1149 }
1150}
1151
1152//------------------------------------------------------------------------------
1153
1155{
1156 TRACEF(Dump, "Sync()");
1157
1158 int ret = m_data_file->Fsync();
1159 bool errorp = false;
1160 if (ret == XrdOssOK)
1161 {
1162 Stats loc_stats;
1163 {
1164 XrdSysCondVarHelper _lck(&m_state_cond);
1165 report_and_merge_delta_stats();
1166 loc_stats = m_stats;
1167 }
1168 m_cfi.WriteIOStat(loc_stats);
1169 m_cfi.Write(m_info_file, m_filename.c_str());
1170 int cret = m_info_file->Fsync();
1171 if (cret != XrdOssOK)
1172 {
1173 TRACEF(Error, "Sync cinfo file sync error " << cret);
1174 errorp = true;
1175 }
1176 }
1177 else
1178 {
1179 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1180 errorp = true;
1181 }
1182
1183 if (errorp)
1184 {
1185 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1186
1187 // Unlink will also call this->initiate_emergency_shutdown()
1188 Cache::GetInstance().UnlinkFile(m_filename, false);
1189
1190 XrdSysCondVarHelper _lck(&m_state_cond);
1191
1192 m_writes_during_sync.clear();
1193 m_in_sync = false;
1194
1195 return;
1196 }
1197
1198 int written_while_in_sync;
1199 bool resync = false;
1200 {
1201 XrdSysCondVarHelper _lck(&m_state_cond);
1202 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1203 {
1204 m_cfi.SetBitSynced(*i);
1205 }
1206 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1207 m_writes_during_sync.clear();
1208
1209 // If there were writes during sync and the file is now complete,
1210 // let us call Sync again without resetting the m_in_sync flag.
1211 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1212 resync = true;
1213 else
1214 m_in_sync = false;
1215 }
1216 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1217
1218 if (resync)
1219 Sync();
1220}
1221
1222
1223//==============================================================================
1224// Block processing
1225//==============================================================================
1226
1227void File::free_block(Block* b)
1228{
1229 // Method always called under lock.
1230 int i = b->m_offset / m_block_size;
1231 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1232 size_t ret = m_block_map.erase(i);
1233 if (ret != 1)
1234 {
1235 // assert might be a better option than a warning
1236 TRACEF(Error, "free_block did not erase " << i << " from map");
1237 }
1238 else
1239 {
1240 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1241 delete b;
1242 }
1243
1244 if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1245 {
1246 m_prefetch_state = kOn;
1247 cache()->RegisterPrefetchFile(this);
1248 }
1249}
1250
1251//------------------------------------------------------------------------------
1252
1253bool File::select_current_io_or_disable_prefetching(bool skip_current)
1254{
1255 // Method always called under lock. It also expects prefetch to be active.
1256
1257 int io_size = (int) m_io_set.size();
1258 bool io_ok = false;
1259
1260 if (io_size == 1)
1261 {
1262 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1263 if (io_ok)
1264 {
1265 m_current_io = m_io_set.begin();
1266 }
1267 }
1268 else if (io_size > 1)
1269 {
1270 IoSet_i mi = m_current_io;
1271 if (skip_current && mi != m_io_set.end()) ++mi;
1272
1273 for (int i = 0; i < io_size; ++i)
1274 {
1275 if (mi == m_io_set.end()) mi = m_io_set.begin();
1276
1277 if ((*mi)->m_allow_prefetching)
1278 {
1279 m_current_io = mi;
1280 io_ok = true;
1281 break;
1282 }
1283 ++mi;
1284 }
1285 }
1286
1287 if ( ! io_ok)
1288 {
1289 m_current_io = m_io_set.end();
1290 m_prefetch_state = kStopped;
1291 cache()->DeRegisterPrefetchFile(this);
1292 }
1293
1294 return io_ok;
1295}
1296
1297//------------------------------------------------------------------------------
1298
1299void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1300{
1301 // Called from DirectResponseHandler.
1302 // NOT under lock.
1303
1304 if (error_cond)
1305 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1306
1307 m_state_cond.Lock();
1308
1309 if (error_cond)
1310 rreq->update_error_cond(error_cond);
1311 else {
1312 rreq->m_stats.m_BytesBypassed += bytes_read;
1313 rreq->m_bytes_read += bytes_read;
1314 }
1315
1316 rreq->m_direct_done = true;
1317
1318 bool rreq_complete = rreq->is_complete();
1319
1320 m_state_cond.UnLock();
1321
1322 if (rreq_complete)
1323 FinalizeReadRequest(rreq);
1324}
1325
1326void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1327{
1328 // Called from ProcessBlockResponse().
1329 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1330 // Does not manage m_read_req.
1331 // Will not complete the request.
1332
1333 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1334 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1335
1336 rreq->update_error_cond(b->get_error());
1337 --rreq->m_n_chunk_reqs;
1338
1339 dec_ref_count(b);
1340}
1341
1342void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1343{
1344 // Called from ProcessBlockResponse().
1345 // NOT under lock as it does memcopy ofor exisf block data.
1346 // Acquires lock for block, m_read_req and rreq state update.
1347
1348 ReadRequest *rreq = creq.m_read_req;
1349
1350 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1351 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1352
1353 m_state_cond.Lock();
1354
1355 rreq->m_bytes_read += creq.m_size;
1356
1357 if (b->get_req_id() == (void*) rreq)
1358 rreq->m_stats.m_BytesMissed += creq.m_size;
1359 else
1360 rreq->m_stats.m_BytesHit += creq.m_size;
1361
1362 --rreq->m_n_chunk_reqs;
1363
1364 if (b->m_prefetch)
1365 inc_prefetch_hit_cnt(1);
1366
1367 dec_ref_count(b);
1368
1369 bool rreq_complete = rreq->is_complete();
1370
1371 m_state_cond.UnLock();
1372
1373 if (rreq_complete)
1374 FinalizeReadRequest(rreq);
1375}
1376
1377void File::FinalizeReadRequest(ReadRequest *rreq)
1378{
1379 // called from ProcessBlockResponse()
1380 // NOT under lock -- does callout
1381 {
1382 XrdSysCondVarHelper _lck(m_state_cond);
1383 m_delta_stats.AddReadStats(rreq->m_stats);
1384 check_delta_stats();
1385 }
1386
1387 rreq->m_rh->Done(rreq->return_value());
1388 delete rreq;
1389}
1390
1391void File::ProcessBlockResponse(Block *b, int res)
1392{
1393 static const char* tpfx = "ProcessBlockResponse ";
1394
1395 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1396
1397 if (res >= 0 && res != b->get_size())
1398 {
1399 // Incorrect number of bytes received, apparently size of the file on the remote
1400 // is different than what the cache expects it to be.
1401 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1402 Cache::GetInstance().UnlinkFile(m_filename, false);
1403 }
1404
1405 m_state_cond.Lock();
1406
1407 // Deregister block from IO's prefetch count, if needed.
1408 if (b->m_prefetch)
1409 {
1410 IO *io = b->get_io();
1411 IoSet_i mi = m_io_set.find(io);
1412 if (mi != m_io_set.end())
1413 {
1414 --io->m_active_prefetches;
1415
1416 // If failed and IO is still prefetching -- disable prefetching on this IO.
1417 if (res < 0 && io->m_allow_prefetching)
1418 {
1419 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1420 io->m_allow_prefetching = false;
1421
1422 // Check if any IO is still available for prfetching. If not, stop it.
1423 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1424 {
1425 if ( ! select_current_io_or_disable_prefetching(false) )
1426 {
1427 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1428 }
1429 }
1430 }
1431
1432 // If failed with no subscribers -- delete the block and exit.
1433 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1434 {
1435 free_block(b);
1436 m_state_cond.UnLock();
1437 return;
1438 }
1439 m_prefetch_bytes += b->get_size();
1440 }
1441 else
1442 {
1443 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1444 }
1445 }
1446
1447 if (res == b->get_size())
1448 {
1449 b->set_downloaded();
1450 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1451 if ( ! m_in_shutdown)
1452 {
1453 // Increase ref-count for the writer.
1454 inc_ref_count(b);
1455 m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1456 // No check for writes, report-and-merge forced during Sync().
1457 cache()->AddWriteTask(b, true);
1458 }
1459
1460 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1461 vChunkRequest_t creqs_to_notify;
1462 creqs_to_notify.swap( b->m_chunk_reqs );
1463
1464 m_state_cond.UnLock();
1465
1466 for (auto &creq : creqs_to_notify)
1467 {
1468 ProcessBlockSuccess(b, creq);
1469 }
1470 }
1471 else
1472 {
1473 if (res < 0) {
1474 bool new_error = b->get_io()->register_block_error(res);
1475 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1476 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1477 << ", io=" << b->get_io() << ", error=" << res);
1478 } else {
1479 bool first_p = b->get_io()->register_incomplete_read();
1480 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1481 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1482 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1483#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1484 res = -EIO;
1485#else
1486 res = -EREMOTEIO;
1487#endif
1488 }
1489 b->set_error(res);
1490
1491 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1492 // Collect others with a different IO, the first of them will be used to reissue the request.
1493 // This is then done outside of lock.
1494 std::list<ReadRequest*> rreqs_to_complete;
1495 vChunkRequest_t creqs_to_keep;
1496
1497 for(ChunkRequest &creq : b->m_chunk_reqs)
1498 {
1499 ReadRequest *rreq = creq.m_read_req;
1500
1501 if (rreq->m_io == b->get_io())
1502 {
1503 ProcessBlockError(b, rreq);
1504 if (rreq->is_complete())
1505 {
1506 rreqs_to_complete.push_back(rreq);
1507 }
1508 }
1509 else
1510 {
1511 creqs_to_keep.push_back(creq);
1512 }
1513 }
1514
1515 bool reissue = false;
1516 if ( ! creqs_to_keep.empty())
1517 {
1518 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1519
1520 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1521 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1522
1523 b->reset_error_and_set_io(rreq->m_io, rreq);
1524 b->m_chunk_reqs.swap( creqs_to_keep );
1525 reissue = true;
1526 }
1527
1528 m_state_cond.UnLock();
1529
1530 for (auto rreq : rreqs_to_complete)
1531 FinalizeReadRequest(rreq);
1532
1533 if (reissue)
1534 ProcessBlockRequest(b);
1535 }
1536}
1537
1538//------------------------------------------------------------------------------
1539
1540const char* File::lPath() const
1541{
1542 return m_filename.c_str();
1543}
1544
1545//------------------------------------------------------------------------------
1546
1547int File::offsetIdx(int iIdx) const
1548{
1549 return iIdx - m_offset/m_block_size;
1550}
1551
1552
1553//------------------------------------------------------------------------------
1554
1556{
1557 // Check that block is not on disk and not in RAM.
1558 // TODO: Could prefetch several blocks at once!
1559 // blks_max could be an argument
1560
1561 BlockList_t blks;
1562
1563 TRACEF(DumpXL, "Prefetch() entering.");
1564 {
1565 XrdSysCondVarHelper _lck(m_state_cond);
1566
1567 if (m_prefetch_state != kOn)
1568 {
1569 return;
1570 }
1571
1572 if ( ! select_current_io_or_disable_prefetching(true) )
1573 {
1574 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1575 return;
1576 }
1577
1578 // Select block(s) to fetch.
1579 for (int f = 0; f < m_num_blocks; ++f)
1580 {
1581 if ( ! m_cfi.TestBitWritten(f))
1582 {
1583 int f_act = f + m_offset / m_block_size;
1584
1585 BlockMap_i bi = m_block_map.find(f_act);
1586 if (bi == m_block_map.end())
1587 {
1588 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1589 if (b)
1590 {
1591 TRACEF(Dump, "Prefetch take block " << f_act);
1592 blks.push_back(b);
1593 // Note: block ref_cnt not increased, it will be when placed into write queue.
1594
1595 inc_prefetch_read_cnt(1);
1596 }
1597 else
1598 {
1599 // This shouldn't happen as prefetching stops when RAM is 70% full.
1600 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1601 }
1602 break;
1603 }
1604 }
1605 }
1606
1607 if (blks.empty())
1608 {
1609 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1610 m_prefetch_state = kComplete;
1611 cache()->DeRegisterPrefetchFile(this);
1612 }
1613 else
1614 {
1615 (*m_current_io)->m_active_prefetches += (int) blks.size();
1616 }
1617 }
1618
1619 if ( ! blks.empty())
1620 {
1621 ProcessBlockRequests(blks);
1622 }
1623}
1624
1625
1626//------------------------------------------------------------------------------
1627
1629{
1630 return m_prefetch_score;
1631}
1632
1634{
1635 return Cache::GetInstance().GetLog();
1636}
1637
1642
1643void File::insert_remote_location(const std::string &loc)
1644{
1645 if ( ! loc.empty())
1646 {
1647 size_t p = loc.find_first_of('@');
1648 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1649 }
1650}
1651
1652std::string File::GetRemoteLocations() const
1653{
1654 std::string s;
1655 if ( ! m_remote_locations.empty())
1656 {
1657 size_t sl = 0;
1658 int nl = 0;
1659 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1660 {
1661 sl += i->size();
1662 }
1663 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1664 s = '[';
1665 int j = 1;
1666 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1667 {
1668 s += '"'; s += *i; s += '"';
1669 if (j < nl) s += ',';
1670 }
1671 s += ']';
1672 }
1673 else
1674 {
1675 s = "[]";
1676 }
1677 return s;
1678}
1679
1680//==============================================================================
1681//======================= RESPONSE HANDLERS ==============================
1682//==============================================================================
1683
1685{
1686 m_block->m_file->ProcessBlockResponse(m_block, res);
1687 delete this;
1688}
1689
1690//------------------------------------------------------------------------------
1691
1693{
1694 m_mutex.Lock();
1695
1696 int n_left = --m_to_wait;
1697
1698 if (res < 0) {
1699 if (m_errno == 0) m_errno = res; // store first reported error
1700 } else {
1701 m_bytes_read += res;
1702 }
1703
1704 m_mutex.UnLock();
1705
1706 if (n_left == 0)
1707 {
1708 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1709 delete this;
1710 }
1711}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:152
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:204
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:283
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1187
XrdOss * GetOss() const
Definition XrdPfc.hh:268
XrdSysError * GetLog()
Definition XrdPfc.hh:282
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
bool register_incomplete_read()
Definition XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
bool register_block_error(int res)
Definition XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70
const char * GetLocation()
Definition XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
XrdPosixStats Stats
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:80
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:73
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:113
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:108
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68