acquire-worker.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // -*- mode: cpp; mode: fold -*-
  2. // Description /*{{{*/
  3. // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
  4. /* ######################################################################
  5. Acquire Worker
  6. The worker process can startup either as a Configuration prober
  7. or as a queue runner. As a configuration prober it only reads the
  8. configuration message and
  9. ##################################################################### */
  10. /*}}}*/
  11. // Include Files /*{{{*/
  12. #include <config.h>
  13. #include <apt-pkg/acquire-worker.h>
  14. #include <apt-pkg/acquire-item.h>
  15. #include <apt-pkg/configuration.h>
  16. #include <apt-pkg/error.h>
  17. #include <apt-pkg/fileutl.h>
  18. #include <apt-pkg/strutl.h>
  19. #include <iostream>
  20. #include <sstream>
  21. #include <fstream>
  22. #include <sys/stat.h>
  23. #include <unistd.h>
  24. #include <fcntl.h>
  25. #include <signal.h>
  26. #include <stdio.h>
  27. #include <errno.h>
  28. #include <apti18n.h>
  29. /*}}}*/
  30. using namespace std;
  31. // Worker::Worker - Constructor for Queue startup /*{{{*/
  32. // ---------------------------------------------------------------------
  33. /* */
  34. pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
  35. pkgAcquireStatus *Log) : Log(Log)
  36. {
  37. OwnerQ = Q;
  38. Config = Cnf;
  39. Access = Cnf->Access;
  40. CurrentItem = 0;
  41. TotalSize = 0;
  42. CurrentSize = 0;
  43. Construct();
  44. }
  45. /*}}}*/
  46. // Worker::Worker - Constructor for method config startup /*{{{*/
  47. // ---------------------------------------------------------------------
  48. /* */
  49. pkgAcquire::Worker::Worker(MethodConfig *Cnf)
  50. {
  51. OwnerQ = 0;
  52. Config = Cnf;
  53. Access = Cnf->Access;
  54. CurrentItem = 0;
  55. TotalSize = 0;
  56. CurrentSize = 0;
  57. Construct();
  58. }
  59. /*}}}*/
  60. // Worker::Construct - Constructor helper /*{{{*/
  61. // ---------------------------------------------------------------------
  62. /* */
  63. void pkgAcquire::Worker::Construct()
  64. {
  65. NextQueue = 0;
  66. NextAcquire = 0;
  67. Process = -1;
  68. InFd = -1;
  69. OutFd = -1;
  70. OutReady = false;
  71. InReady = false;
  72. Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
  73. }
  74. /*}}}*/
  75. // Worker::~Worker - Destructor /*{{{*/
  76. // ---------------------------------------------------------------------
  77. /* */
  78. pkgAcquire::Worker::~Worker()
  79. {
  80. close(InFd);
  81. close(OutFd);
  82. if (Process > 0)
  83. {
  84. /* Closing of stdin is the signal to exit and die when the process
  85. indicates it needs cleanup */
  86. if (Config->NeedsCleanup == false)
  87. kill(Process,SIGINT);
  88. ExecWait(Process,Access.c_str(),true);
  89. }
  90. }
  91. /*}}}*/
  92. // Worker::Start - Start the worker process /*{{{*/
  93. // ---------------------------------------------------------------------
  94. /* This forks the method and inits the communication channel */
  95. bool pkgAcquire::Worker::Start()
  96. {
  97. // Get the method path
  98. string Method = _config->FindDir("Dir::Bin::Methods") + Access;
  99. if (FileExists(Method) == false)
  100. {
  101. _error->Error(_("The method driver %s could not be found."),Method.c_str());
  102. if (Access == "https")
  103. _error->Notice(_("Is the package %s installed?"), "apt-transport-https");
  104. return false;
  105. }
  106. if (Debug == true)
  107. clog << "Starting method '" << Method << '\'' << endl;
  108. // Create the pipes
  109. int Pipes[4] = {-1,-1,-1,-1};
  110. if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
  111. {
  112. _error->Errno("pipe","Failed to create IPC pipe to subprocess");
  113. for (int I = 0; I != 4; I++)
  114. close(Pipes[I]);
  115. return false;
  116. }
  117. for (int I = 0; I != 4; I++)
  118. SetCloseExec(Pipes[I],true);
  119. // Fork off the process
  120. Process = ExecFork();
  121. if (Process == 0)
  122. {
  123. // Setup the FDs
  124. dup2(Pipes[1],STDOUT_FILENO);
  125. dup2(Pipes[2],STDIN_FILENO);
  126. SetCloseExec(STDOUT_FILENO,false);
  127. SetCloseExec(STDIN_FILENO,false);
  128. SetCloseExec(STDERR_FILENO,false);
  129. const char *Args[2];
  130. Args[0] = Method.c_str();
  131. Args[1] = 0;
  132. execv(Args[0],(char **)Args);
  133. cerr << "Failed to exec method " << Args[0] << endl;
  134. _exit(100);
  135. }
  136. // Fix up our FDs
  137. InFd = Pipes[0];
  138. OutFd = Pipes[3];
  139. SetNonBlock(Pipes[0],true);
  140. SetNonBlock(Pipes[3],true);
  141. close(Pipes[1]);
  142. close(Pipes[2]);
  143. OutReady = false;
  144. InReady = true;
  145. // Read the configuration data
  146. if (WaitFd(InFd) == false ||
  147. ReadMessages() == false)
  148. return _error->Error(_("Method %s did not start correctly"),Method.c_str());
  149. RunMessages();
  150. if (OwnerQ != 0)
  151. SendConfiguration();
  152. return true;
  153. }
  154. /*}}}*/
  155. // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
  156. // ---------------------------------------------------------------------
  157. /* */
  158. bool pkgAcquire::Worker::ReadMessages()
  159. {
  160. if (::ReadMessages(InFd,MessageQueue) == false)
  161. return MethodFailure();
  162. return true;
  163. }
  164. /*}}}*/
  165. // Worker::RunMessage - Empty the message queue /*{{{*/
  166. // ---------------------------------------------------------------------
  167. /* This takes the messages from the message queue and runs them through
  168. the parsers in order. */
  169. bool pkgAcquire::Worker::RunMessages()
  170. {
  171. while (MessageQueue.empty() == false)
  172. {
  173. string Message = MessageQueue.front();
  174. MessageQueue.erase(MessageQueue.begin());
  175. if (Debug == true)
  176. clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
  177. // Fetch the message number
  178. char *End;
  179. int Number = strtol(Message.c_str(),&End,10);
  180. if (End == Message.c_str())
  181. return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
  182. string URI = LookupTag(Message,"URI");
  183. pkgAcquire::Queue::QItem *Itm = 0;
  184. if (URI.empty() == false)
  185. Itm = OwnerQ->FindItem(URI,this);
  186. // update used mirror
  187. string UsedMirror = LookupTag(Message,"UsedMirror", "");
  188. if (!UsedMirror.empty() &&
  189. Itm &&
  190. Itm->Description.find(" ") != string::npos)
  191. {
  192. Itm->Description.replace(0, Itm->Description.find(" "), UsedMirror);
  193. // FIXME: will we need this as well?
  194. //Itm->ShortDesc = UsedMirror;
  195. }
  196. // Determine the message number and dispatch
  197. switch (Number)
  198. {
  199. // 100 Capabilities
  200. case 100:
  201. if (Capabilities(Message) == false)
  202. return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
  203. break;
  204. // 101 Log
  205. case 101:
  206. if (Debug == true)
  207. clog << " <- (log) " << LookupTag(Message,"Message") << endl;
  208. break;
  209. // 102 Status
  210. case 102:
  211. Status = LookupTag(Message,"Message");
  212. break;
  213. // 103 Redirect
  214. case 103:
  215. {
  216. if (Itm == 0)
  217. {
  218. _error->Error("Method gave invalid 103 Redirect message");
  219. break;
  220. }
  221. string NewURI = LookupTag(Message,"New-URI",URI.c_str());
  222. Itm->URI = NewURI;
  223. ItemDone();
  224. pkgAcquire::Item *Owner = Itm->Owner;
  225. pkgAcquire::ItemDesc Desc = *Itm;
  226. // Change the status so that it can be dequeued
  227. Owner->Status = pkgAcquire::Item::StatIdle;
  228. // Mark the item as done (taking care of all queues)
  229. // and then put it in the main queue again
  230. OwnerQ->ItemDone(Itm);
  231. OwnerQ->Owner->Enqueue(Desc);
  232. if (Log != 0)
  233. Log->Done(Desc);
  234. break;
  235. }
  236. // 200 URI Start
  237. case 200:
  238. {
  239. if (Itm == 0)
  240. {
  241. _error->Error("Method gave invalid 200 URI Start message");
  242. break;
  243. }
  244. CurrentItem = Itm;
  245. CurrentSize = 0;
  246. TotalSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
  247. ResumePoint = strtoull(LookupTag(Message,"Resume-Point","0").c_str(), NULL, 10);
  248. Itm->Owner->Start(Message,strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10));
  249. // Display update before completion
  250. if (Log != 0 && Log->MorePulses == true)
  251. Log->Pulse(Itm->Owner->GetOwner());
  252. if (Log != 0)
  253. Log->Fetch(*Itm);
  254. break;
  255. }
  256. // 201 URI Done
  257. case 201:
  258. {
  259. if (Itm == 0)
  260. {
  261. _error->Error("Method gave invalid 201 URI Done message");
  262. break;
  263. }
  264. pkgAcquire::Item *Owner = Itm->Owner;
  265. pkgAcquire::ItemDesc Desc = *Itm;
  266. // Display update before completion
  267. if (Log != 0 && Log->MorePulses == true)
  268. Log->Pulse(Owner->GetOwner());
  269. OwnerQ->ItemDone(Itm);
  270. unsigned long long const ServerSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
  271. bool isHit = StringToBool(LookupTag(Message,"IMS-Hit"),false) ||
  272. StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false);
  273. // Using the https method the server might return 200, but the
  274. // If-Modified-Since condition is not satsified, libcurl will
  275. // discard the download. In this case, however, TotalSize will be
  276. // set to the actual size of the file, while ServerSize will be set
  277. // to 0. Therefore, if the item is marked as a hit and the
  278. // downloaded size (ServerSize) is 0, we ignore TotalSize.
  279. if (TotalSize != 0 && (!isHit || ServerSize != 0) && ServerSize != TotalSize)
  280. _error->Warning("Size of file %s is not what the server reported %s %llu",
  281. Owner->DestFile.c_str(), LookupTag(Message,"Size","0").c_str(),TotalSize);
  282. // see if there is a hash to verify
  283. string RecivedHash;
  284. HashString expectedHash(Owner->HashSum());
  285. if(!expectedHash.empty())
  286. {
  287. string hashTag = expectedHash.HashType()+"-Hash";
  288. string hashSum = LookupTag(Message, hashTag.c_str());
  289. if(!hashSum.empty())
  290. RecivedHash = expectedHash.HashType() + ":" + hashSum;
  291. if(_config->FindB("Debug::pkgAcquire::Auth", false) == true)
  292. {
  293. clog << "201 URI Done: " << Owner->DescURI() << endl
  294. << "RecivedHash: " << RecivedHash << endl
  295. << "ExpectedHash: " << expectedHash.toStr()
  296. << endl << endl;
  297. }
  298. }
  299. Owner->Done(Message, ServerSize, RecivedHash.c_str(), Config);
  300. ItemDone();
  301. // Log that we are done
  302. if (Log != 0)
  303. {
  304. if (isHit)
  305. {
  306. /* Hide 'hits' for local only sources - we also manage to
  307. hide gets */
  308. if (Config->LocalOnly == false)
  309. Log->IMSHit(Desc);
  310. }
  311. else
  312. Log->Done(Desc);
  313. }
  314. break;
  315. }
  316. // 400 URI Failure
  317. case 400:
  318. {
  319. if (Itm == 0)
  320. {
  321. _error->Error("Method gave invalid 400 URI Failure message");
  322. break;
  323. }
  324. // Display update before completion
  325. if (Log != 0 && Log->MorePulses == true)
  326. Log->Pulse(Itm->Owner->GetOwner());
  327. pkgAcquire::Item *Owner = Itm->Owner;
  328. pkgAcquire::ItemDesc Desc = *Itm;
  329. OwnerQ->ItemDone(Itm);
  330. // set some status
  331. if(LookupTag(Message,"FailReason") == "Timeout" ||
  332. LookupTag(Message,"FailReason") == "TmpResolveFailure" ||
  333. LookupTag(Message,"FailReason") == "ResolveFailure" ||
  334. LookupTag(Message,"FailReason") == "ConnectionRefused")
  335. Owner->Status = pkgAcquire::Item::StatTransientNetworkError;
  336. Owner->Failed(Message,Config);
  337. ItemDone();
  338. if (Log != 0)
  339. Log->Fail(Desc);
  340. break;
  341. }
  342. // 401 General Failure
  343. case 401:
  344. _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
  345. break;
  346. // 403 Media Change
  347. case 403:
  348. MediaChange(Message);
  349. break;
  350. }
  351. }
  352. return true;
  353. }
  354. /*}}}*/
  355. // Worker::Capabilities - 100 Capabilities handler /*{{{*/
  356. // ---------------------------------------------------------------------
  357. /* This parses the capabilities message and dumps it into the configuration
  358. structure. */
  359. bool pkgAcquire::Worker::Capabilities(string Message)
  360. {
  361. if (Config == 0)
  362. return true;
  363. Config->Version = LookupTag(Message,"Version");
  364. Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
  365. Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
  366. Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
  367. Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
  368. Config->NeedsCleanup = StringToBool(LookupTag(Message,"Needs-Cleanup"),false);
  369. Config->Removable = StringToBool(LookupTag(Message,"Removable"),false);
  370. // Some debug text
  371. if (Debug == true)
  372. {
  373. clog << "Configured access method " << Config->Access << endl;
  374. clog << "Version:" << Config->Version <<
  375. " SingleInstance:" << Config->SingleInstance <<
  376. " Pipeline:" << Config->Pipeline <<
  377. " SendConfig:" << Config->SendConfig <<
  378. " LocalOnly: " << Config->LocalOnly <<
  379. " NeedsCleanup: " << Config->NeedsCleanup <<
  380. " Removable: " << Config->Removable << endl;
  381. }
  382. return true;
  383. }
  384. /*}}}*/
  385. // Worker::MediaChange - Request a media change /*{{{*/
  386. // ---------------------------------------------------------------------
  387. /* */
  388. bool pkgAcquire::Worker::MediaChange(string Message)
  389. {
  390. int status_fd = _config->FindI("APT::Status-Fd",-1);
  391. if(status_fd > 0)
  392. {
  393. string Media = LookupTag(Message,"Media");
  394. string Drive = LookupTag(Message,"Drive");
  395. ostringstream msg,status;
  396. ioprintf(msg,_("Please insert the disc labeled: "
  397. "'%s' "
  398. "in the drive '%s' and press enter."),
  399. Media.c_str(),Drive.c_str());
  400. status << "media-change: " // message
  401. << Media << ":" // media
  402. << Drive << ":" // drive
  403. << msg.str() // l10n message
  404. << endl;
  405. std::string const dlstatus = status.str();
  406. FileFd::Write(status_fd, dlstatus.c_str(), dlstatus.size());
  407. }
  408. if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
  409. LookupTag(Message,"Drive")) == false)
  410. {
  411. char S[300];
  412. snprintf(S,sizeof(S),"603 Media Changed\nFailed: true\n\n");
  413. if (Debug == true)
  414. clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
  415. OutQueue += S;
  416. OutReady = true;
  417. return true;
  418. }
  419. char S[300];
  420. snprintf(S,sizeof(S),"603 Media Changed\n\n");
  421. if (Debug == true)
  422. clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
  423. OutQueue += S;
  424. OutReady = true;
  425. return true;
  426. }
  427. /*}}}*/
  428. // Worker::SendConfiguration - Send the config to the method /*{{{*/
  429. // ---------------------------------------------------------------------
  430. /* */
  431. bool pkgAcquire::Worker::SendConfiguration()
  432. {
  433. if (Config->SendConfig == false)
  434. return true;
  435. if (OutFd == -1)
  436. return false;
  437. /* Write out all of the configuration directives by walking the
  438. configuration tree */
  439. std::ostringstream Message;
  440. Message << "601 Configuration\n";
  441. _config->Dump(Message, NULL, "Config-Item: %F=%V\n", false);
  442. Message << '\n';
  443. if (Debug == true)
  444. clog << " -> " << Access << ':' << QuoteString(Message.str(),"\n") << endl;
  445. OutQueue += Message.str();
  446. OutReady = true;
  447. return true;
  448. }
  449. /*}}}*/
  450. // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
  451. // ---------------------------------------------------------------------
  452. /* Send a URI Acquire message to the method */
  453. bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
  454. {
  455. if (OutFd == -1)
  456. return false;
  457. string Message = "600 URI Acquire\n";
  458. Message.reserve(300);
  459. Message += "URI: " + Item->URI;
  460. Message += "\nFilename: " + Item->Owner->DestFile;
  461. Message += Item->Owner->Custom600Headers();
  462. Message += "\n\n";
  463. if (Debug == true)
  464. clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
  465. OutQueue += Message;
  466. OutReady = true;
  467. return true;
  468. }
  469. /*}}}*/
  470. // Worker::OutFdRead - Out bound FD is ready /*{{{*/
  471. // ---------------------------------------------------------------------
  472. /* */
  473. bool pkgAcquire::Worker::OutFdReady()
  474. {
  475. int Res;
  476. do
  477. {
  478. Res = write(OutFd,OutQueue.c_str(),OutQueue.length());
  479. }
  480. while (Res < 0 && errno == EINTR);
  481. if (Res <= 0)
  482. return MethodFailure();
  483. OutQueue.erase(0,Res);
  484. if (OutQueue.empty() == true)
  485. OutReady = false;
  486. return true;
  487. }
  488. /*}}}*/
  489. // Worker::InFdRead - In bound FD is ready /*{{{*/
  490. // ---------------------------------------------------------------------
  491. /* */
  492. bool pkgAcquire::Worker::InFdReady()
  493. {
  494. if (ReadMessages() == false)
  495. return false;
  496. RunMessages();
  497. return true;
  498. }
  499. /*}}}*/
  500. // Worker::MethodFailure - Called when the method fails /*{{{*/
  501. // ---------------------------------------------------------------------
  502. /* This is called when the method is believed to have failed, probably because
  503. read returned -1. */
  504. bool pkgAcquire::Worker::MethodFailure()
  505. {
  506. _error->Error("Method %s has died unexpectedly!",Access.c_str());
  507. // do not reap the child here to show meaningfull error to the user
  508. ExecWait(Process,Access.c_str(),false);
  509. Process = -1;
  510. close(InFd);
  511. close(OutFd);
  512. InFd = -1;
  513. OutFd = -1;
  514. OutReady = false;
  515. InReady = false;
  516. OutQueue = string();
  517. MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
  518. return false;
  519. }
  520. /*}}}*/
  521. // Worker::Pulse - Called periodically /*{{{*/
  522. // ---------------------------------------------------------------------
  523. /* */
  524. void pkgAcquire::Worker::Pulse()
  525. {
  526. if (CurrentItem == 0)
  527. return;
  528. struct stat Buf;
  529. if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
  530. return;
  531. CurrentSize = Buf.st_size;
  532. // Hmm? Should not happen...
  533. if (CurrentSize > TotalSize && TotalSize != 0)
  534. TotalSize = CurrentSize;
  535. }
  536. /*}}}*/
  537. // Worker::ItemDone - Called when the current item is finished /*{{{*/
  538. // ---------------------------------------------------------------------
  539. /* */
  540. void pkgAcquire::Worker::ItemDone()
  541. {
  542. CurrentItem = 0;
  543. CurrentSize = 0;
  544. TotalSize = 0;
  545. Status = string();
  546. }
  547. /*}}}*/