5#include <zypp-core/zyppng/io/IODevice>
7#include <zypp-core/base/DtorReset>
9#include <zypp-media/MediaException>
10#include <zypp-media/FileCheckException>
11#include <zypp-media/CDTools>
24 , _workerPath( constants::DEFAULT_PROVIDE_WORKER_PATH.data() )
32 MIL <<
"Provider workdir is: " <<
_workDir << std::endl;
62 DBG <<
"Triggering the schedule timer (" <<
reasonStr <<
")" << std::endl;
73 MIL <<
"Provider is not started, NOT scheduling" << std::endl;
78 DBG_PRV <<
"Scheduling triggered during scheduling, returning immediately." << std::endl;
83#ifdef _SC_NPROCESSORS_ONLN
117 if ( (*iMedia)->refCount() > 1 ) {
118 MIL_PRV <<
"Not releasing media " << (*iMedia)->_name <<
" refcount is not zero" << std::endl;
124 if ( (*iMedia)->_idleSince && std::chrono::steady_clock::now() - (*iMedia)->_idleSince.value() >= std::chrono::hours(1) ) {
125 MIL <<
"Detaching medium " << (*iMedia)->_name <<
" for baseUrl " << (*iMedia)->_attachedUrl << std::endl;
129 MIL_PRV <<
"Not releasing media " << (*iMedia)->_name <<
" downloading worker and not timed out yet." << std::endl;
133 auto bQueue = (*iMedia)->_backingQueue.lock();
140 MIL <<
"Detaching medium " << (*iMedia)->_name <<
" for baseUrl " << (*iMedia)->_attachedUrl << std::endl;
145 ERR <<
"Could not send detach request, creating the request failed" << std::endl;
148 ERR <<
"Could not send detach request since no backing queue was defined" << std::endl;
157 const auto schedStart = std::chrono::steady_clock::now();
158 MIL_PRV <<
"Start scheduling" << std::endl;
161 const auto dur = std::chrono::steady_clock::now() -
schedStart;
162 MIL_PRV <<
"Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>(
dur ).count () << std::endl;
184 const auto &scheme =
queueIter->_schemeName;
192 MIL_PRV <<
"Start scheduling for scheme:" << scheme <<
" queue size is: " <<
queue.size() << std::endl;
196 ERR <<
"Scheme: " << scheme <<
" failed to return a valid configuration." << std::endl;
198 while(
queue.size() ) {
217 while (
i !=
queue.end() && !(*i) ) {
224 ProvideRequestRef
item = *
i;
228 if(
item->code() == ProvideMessage::Code::Attach ||
item->code() == ProvideMessage::Code::Detach ) {
235 MIL_PRV <<
"Trying to schedule request: " <<
item->urls().front() << std::endl;
251 for (
const auto &url :
item->urls() ) {
254 MIL <<
"Mirror URL " << url <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
258 if(
item->owner()->canRedirectTo(
item, url ) )
261 MIL_PRV <<
"URL was rejected" << url << std::endl;
267 MIL <<
"Request has NO usable URLs" << std::endl;
302 MIL <<
"Current stats: " << std::endl;
311 MIL_PRV <<
"Reached maximum nr of connections, break" << std::endl;
320 MIL_PRV <<
"Free worker slots and available mirror URLs, starting a new worker" << std::endl;
327 if ( !
item->owner()->safeRedirectTo (
item, url ) )
331 if ( !
q->startup( scheme,
_workDir / scheme / url.getHost(), url.getHost() ) ) {
335 MIL_PRV <<
"Started worker for " << url.getHost() <<
" enqueing request" << std::endl;
337 item->setActiveUrl(url);
356 MIL_PRV <<
"No free worker slots, looking for the best existing worker" << std::endl;
361 if (
i->second->activeRequests () <
candidate->second->activeRequests () )
370 MIL_PRV <<
"Using existing worker " <<
candidate->first.getHost() <<
" to download request" << std::endl;
388 MIL_PRV <<
"No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
401 if ( !
item->owner()->safeRedirectTo (
item, url ) )
405 if ( !
q->startup( scheme,
_workDir / scheme / url.getHost(), url.getHost() ) ) {
409 MIL_PRV <<
"Replaced worker for " << url.getHost() <<
", enqueing request" << std::endl;
411 item->setActiveUrl(url);
428 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
438 while (
i !=
queue.end() && !(*i) ) {
446 ProvideRequestRef
item = *
i;
450 if(
item->code() == ProvideMessage::Code::Attach ||
item->code() == ProvideMessage::Code::Detach ) {
452 if (
item->owner () )
457 MIL_PRV <<
"Trying to schedule request: " <<
item->urls().front() << std::endl;
475 MIL <<
"Mirror URL " <<
tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
484 MIL <<
"Request has NO usable URLs" << std::endl;
510 MIL <<
"Current stats: " << std::endl;
520 MIL_PRV <<
"Using existing idle worker to provide request" << std::endl;
522 item->owner()->redirectTo (
item, url );
523 item->setActiveUrl( url );
537 MIL_PRV <<
"Free CPU slots, starting a new worker" << std::endl;
540 item->owner()->redirectTo (
item, url );
543 if (
q->startup( scheme,
_workDir / scheme ) ) {
545 item->setActiveUrl(url);
563 MIL_PRV <<
"No free CPU slots, looking for the best existing worker" << std::endl;
568 if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
573 item->owner()->redirectTo (
item, url );
575 MIL_PRV <<
"Using existing worker to provide request" << std::endl;
576 item->setActiveUrl( url );
577 (*candidate)->enqueue(
item );
587 MIL_PRV <<
"No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
595 item->owner()->redirectTo (
item, url );
598 if (
q->startup( scheme,
_workDir / scheme ) ) {
600 MIL_PRV <<
"Replaced worker, enqueing request" << std::endl;
602 item->setActiveUrl(url);
618 MIL_PRV <<
"No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
623 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
634 while (
i !=
queue.end() && !(*i) ) {
642 ProvideRequestRef
item = *
i;
643 MIL_PRV <<
"Trying to schedule request: " <<
item->urls().front() << std::endl;
650 MIL <<
"Mirror URL " <<
tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
659 MIL <<
"Request has NO usable URLs" << std::endl;
670 if ( !
q->startup( scheme,
_workDir / scheme ) ) {
671 ERR <<
"Worker startup failed!" << std::endl;
680 MIL_PRV <<
"Started worker, enqueing request" << std::endl;
684 MIL_PRV <<
"Found worker, enqueing request" << std::endl;
689 item->owner()->redirectTo (
item, url );
691 item->setActiveUrl(url);
699 std::list<ProvideItemRef> &ProvidePrivate::items()
706 return _credManagerOptions;
709 std::vector<AttachedMediaInfo_Ptr> &ProvidePrivate::attachedMediaInfos()
711 return _attachedMediaInfos;
716 if (
auto i = _schemeConfigs.find( scheme );
i != _schemeConfigs.end() ) {
721 if ( !
q.startup( scheme, _workDir / scheme ) ) {
724 auto newItem = _schemeConfigs.insert( std::make_pair( scheme,
q.workerConfig() ));
734 _fileCache.erase ( key );
741 i.first->second._deathTimer.reset();
742 return i.first->second._file;
746 return i.first->second._file;
752 return (_fileCache.count(key) > 0);
757 _items.push_back(
item );
758 schedule( ProvidePrivate::EnqueueItem );
763 auto elem = std::find_if( _items.begin(), _items.end(), [
item](
const auto &
i){ return i.get() == item; } );
764 if (
elem != _items.end() ) {
765 if ( _isScheduling ) {
773 std::string ProvidePrivate::nextMediaId()
const
779 AttachedMediaInfo_Ptr ProvidePrivate::addMedium( AttachedMediaInfo_Ptr &&
medium )
785 MIL_PRV <<
"Registered new media attachment with ID: " <<
medium->name() <<
" with mountPoint: (" <<
medium->_localMountPoint.value_or(
zypp::Pathname()) <<
")" << std::endl;
786 _attachedMediaInfos.push_back( std::move(
medium) );
788 return _attachedMediaInfos.back();
791 bool ProvidePrivate::queueRequest ( ProvideRequestRef req )
793 const auto &
schemeName = effectiveScheme( req->url().getScheme() );
795 return (qItem._schemeName == schemeName);
803 schedule( ProvidePrivate::EnqueueReq );
807 bool ProvidePrivate::dequeueRequest(ProvideRequestRef req , std::exception_ptr error)
809 auto queue = req->currentQueue ();
811 queue->cancel( req.get(), error );
815 for (
auto &
q : _queues ) {
816 auto elem = std::find(
q._requests.begin(),
q._requests.end(), req );
817 if (
elem !=
q._requests.end() ) {
818 q._requests.erase(
elem);
821 req->owner()->finishReq(
nullptr, req, error );
836 for (
const auto &
v : _workerQueues ) {
837 if (
v.second.get() == &
q )
843 bool ProvidePrivate::isRunning()
const
848 std::string ProvidePrivate::effectiveScheme(
const std::string &scheme)
const
851 if (
auto it = _workerAlias.find (
ss );
it != _workerAlias.end () ) {
857 void ProvidePrivate::onPulseTimeout(
Timer & )
859 DBG_PRV <<
"Pulse timeout" << std::endl;
861 auto now = std::chrono::steady_clock::now();
863 if ( _log ) _log->pulse();
866 for (
auto i = _fileCache.begin ();
i != _fileCache.end(); ) {
870 if ( now - *
cacheItem._deathTimer < std::chrono::seconds(20) ) {
871 MIL <<
"Releasing file " << *
i->second._file <<
" from cache, death timeout." << std::endl;
872 i = _fileCache.erase(
i);
877 cacheItem._deathTimer = std::chrono::steady_clock::now();
885 void ProvidePrivate::onQueueIdle()
887 if ( !_items.empty() )
889 for (
auto &[
k,
q] : _workerQueues ) {
900 if (
item.state() == ProvideItem::Finished ) {
902 auto i = std::find( _items.begin(), _items.end(),
itemRef );
903 if (
i == _items.end() ) {
904 ERR <<
"State of unknown Item changed, ignoring" << std::endl;
912 if ( _items.empty() )
916 uint32_t ProvidePrivate::nextRequestId()
919 return ++_nextRequestId;
923 : _parent( parent.weak_this<
Provide>() )
986 if ( urls.empty() ) {
992 std::optional<ProvideQueue::Config> scheme;
995 const auto &
s =
d->schemeConfig(
d->effectiveScheme(
mirrIt->getScheme() ) );
997 WAR <<
"URL: " << *
mirrIt <<
" is not supported, ignoring!" << std::endl;
1004 if ( scheme->worker_type () ==
s->worker_type () ) {
1007 WAR <<
"URL: " << *
mirrIt <<
" has different worker type than the primary URL: "<<
usableMirrs.front() <<
", ignoring!" << std::endl;
1017 auto &attachedMedia =
d->attachedMediaInfos ();
1018 for (
auto &
medium : attachedMedia ) {
1026 return op->promise();
1034 return op->promise();
1045 const auto i = std::find(
d->_attachedMediaInfos.begin(),
d->_attachedMediaInfos.end(),
attachHandle.mediaInfo() );
1046 if (
i ==
d->_attachedMediaInfos.end() ) {
1068 return op->promise();
1081 return expected<zypp::CheckSum>::success( zypp::CheckSum( algorithm, chksumRes.headers().value(algorithm).asString() ) );
1108 auto fName = source.file();
1110 | [
resSave = std::move(source) ] (
auto &&result ) {
1120 d->_isRunning =
true;
1121 d->_pulseTimer->start( 5000 );
1123 if (
d->_log )
d->_log->provideStart();
1128 d_func()->_workerPath = path;
1146 return d_func()->_workDir;
1152 return d->_credManagerOptions;
1157 d_func()->_credManagerOptions = opt;
1162 return d_func()->_sigIdle;
1167 return d_func()->_sigMediaChange;
1172 return d_func()->_sigAuthRequired;
1178 : _provider( parent )
1191 const auto &
fTime =
item.finishedTime();
1193 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
item.finishedTime() -
item.startTime() );
1196 MIL <<
"Item finished after " << (
item.finishedTime() -
item.startTime()).
count() <<
" ns" << std::endl;
1203 MIL <<
"Item failed" << std::endl;
1222 for (
const auto &
i :
prov->d_func()->items() ) {
1236 const auto &
stats =
i->currentStats();
1239 ERR <<
"Bug! Stats should be initialized by now" << std::endl;
1254 const auto now = std::chrono::steady_clock::now();
1263 if (
sinceLast >= std::chrono::seconds(1) )
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
reference value() const
Reference to the Tp object.
bool unique() const
Returns true if this is the only AutoDispose instance managing the current data object.
Store and operate with byte count.
Assign a vaiable a certain value when going out of scope.
Base class for Exception.
std::string getScheme() const
Returns the scheme name of the URL.
void setAuthority(const std::string &authority)
Set the authority component in the URL.
void setPathName(const std::string &path, EEncoding eflag=zypp::url::E_DECODED)
Set the path name.
bool isValid() const
Verifies the Url.
void appendPathName(const Pathname &path_r, EEncoding eflag_r=zypp::url::E_DECODED)
Extend the path name.
void setScheme(const std::string &scheme)
Set the scheme name in the URL.
Wrapper class for stat/lstat.
bool empty() const
Test for an empty path.
Pathname realpath() const
Returns this path as the absolute canonical pathname.
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
static ProvideFileItemRef create(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request, ProvidePrivate &parent)
const std::string queueName(ProvideQueue &q) const
std::string effectiveScheme(const std::string &scheme) const
std::list< ProvideItemRef > _items
ProvidePrivate(zypp::Pathname &&workDir, Provide &pub)
Timer::Ptr _scheduleTrigger
std::unordered_map< std::string, ProvideQueueRef > _workerQueues
std::vector< AttachedMediaInfo_Ptr > _attachedMediaInfos
void onPulseTimeout(Timer &)
std::deque< QueueItem > _queues
void schedule(ScheduleReason reason)
expected< ProvideQueue::Config > schemeConfig(const std::string &scheme)
std::chrono::time_point< std::chrono::steady_clock > TimePoint
static expected< ProvideRequestRef > createDetach(const zypp::Url &url)
A ProvideRes object is a reference counted ownership of a resource in the cache provided by a Provide...
virtual void provideStart()
const Stats & stats() const
ProvideStatus(ProvideRef parent)
virtual void itemFailed(ProvideItem &item)
virtual void itemDone(ProvideItem &item)
AsyncOpRef< expected< ProvideRes > > provide(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request)
const zypp::media::CredManagerOptions & credManangerOptions() const
static ProvideRef create(const zypp::Pathname &workDir="")
AsyncOpRef< expected< MediaHandle > > attachMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
void setWorkerPath(const zypp::Pathname &path)
SignalProxy< std::optional< zypp::media::AuthData >(const zypp::Url &reqUrl, const std::string &triedUsername, const std::map< std::string, std::string > &extraValues) sigAuthRequired)()
SignalProxy< void()> sigIdle()
void setCredManagerOptions(const zypp::media::CredManagerOptions &opt)
Provide(const zypp::Pathname &workDir)
AsyncOpRef< expected< zypp::CheckSum > > checksumForFile(const zypp::Pathname &p, const std::string &algorithm)
void setStatusTracker(ProvideStatusRef tracker)
bool ejectDevice(const std::string &queueRef, const std::string &device)
AsyncOpRef< expected< zypp::ManagedFile > > copyFile(const zypp::Pathname &source, const zypp::Pathname &target)
std::optional< Action > MediaChangeAction
const zypp::Pathname & providerWorkdir() const
SignalProxy< MediaChangeAction(const std::string &queueRef, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc) sigMediaChangeRequested)()
ProvideMediaHandle MediaHandle
The Timer class provides repetitive and single-shot timers.
SignalProxy< void(Timer &t) sigExpired)()
This signal is always emitted when the timer expires.
static expected success(ConsParams &&...params)
String related utilities and Regular expression matching.
int unlink(const Pathname &path)
Like 'unlink'.
const std::string & asString(const std::string &t)
Global asString() that works with std::string too.
bool startsWith(const C_Str &str_r, const C_Str &prefix_r)
alias for hasPrefix
std::string stripSuffix(const C_Str &str_r, const C_Str &suffix_r)
Strip a suffix_r from str_r and return the resulting string.
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
constexpr auto DEFAULT_ACTIVE_CONN_PER_HOST
constexpr auto DEFAULT_ACTIVE_CONN
constexpr auto DEFAULT_MAX_DYNAMIC_WORKERS
constexpr std::string_view ATTACHED_MEDIA_SUFFIX
std::conditional_t< isAsync, AsyncOpRef< T >, T > makeReadyResult(T &&result)
ResultType and_then(const expected< T, E > &exp, Function &&f)
bool provideDebugEnabled()
zypp::ByteCount _partialBytes
zypp::ByteCount _perSecondSinceLastPulse
zypp::ByteCount _perSecond
zypp::ByteCount _expectedBytes
std::chrono::steady_clock::time_point _startTime
zypp::ByteCount _finishedBytes
std::chrono::steady_clock::time_point _lastPulseTime
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
#define ZYPP_IMPL_PRIVATE(Class)