Coverage for gpaw/utilities/progressbar.py: 63%

84 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-07-14 00:18 +0000

1import sys 

2import functools 

3from time import time, sleep 

4 

5from gpaw.utilities import devnull 

6 

7from gpaw.utilities.memory import maxrss 

8 

9 

10class ProgressBar: 

11 def __init__(self, fd=sys.stdout, nobar=False): 

12 """Progress-bar. 

13 

14 Usage:: 

15 

16 pb = ProgressBar() 

17 for i in range(10): 

18 pb.update(i / 10.0) 

19 do_stuff() 

20 pb.finish() 

21 """ 

22 self.fd = fd 

23 

24 try: 

25 self.tty = fd.isatty() 

26 except AttributeError: 

27 self.tty = False 

28 

29 self.done = False 

30 self.n = None 

31 self.nobar = nobar 

32 self.t = time() 

33 self.symbols = ['-', '*'] 

34 fd.flush() 

35 

36 def update(self, x): 

37 """Update progress-bar (0 <= x <= 1).""" 

38 if x == 0 or self.done: 

39 return 

40 

41 if self.tty: 

42 N = 35 

43 elif self.nobar: 

44 N = 10 

45 else: 

46 N = 40 

47 

48 n = int(N * x) 

49 t = time() - self.t 

50 t_dt = self.format_time(t) 

51 est = self.format_time(t / x) 

52 p = functools.partial(print, file=self.fd) 

53 

54 if self.tty: 

55 bar = '-' * (n - 1) + self.symbols[int(t % len(self.symbols))] 

56 p(('\r{0} / {1} ({2:.0f}%) |{3:' + str(N) + '}| ') 

57 .format(t_dt, est, x * 100, bar), end='') 

58 p(f' {maxrss() / 1024**2:.0f} MB/core', end='') 

59 if x == 1: 

60 p() 

61 self.done = True 

62 self.fd.flush() 

63 elif self.nobar: 

64 if self.n is None: 

65 p(('Started: {:.0f} MB/core') 

66 .format(maxrss() / 1024**2)) 

67 self.n = 0 

68 if n > self.n: 

69 p(('{} of {} ({:.0f}%) {:.0f} MB/core') 

70 .format(t_dt, est, x * 100, maxrss() / 1024**2)) 

71 self.fd.flush() 

72 self.n = n 

73 if x == 1: 

74 p(f'Finished in {t_dt}') 

75 self.fd.flush() 

76 self.done = True 

77 else: 

78 if self.n is None: 

79 p(f'{t / x}s |', end='') 

80 self.n = 0 

81 if n > self.n: 

82 p('-' * (n - self.n), end='') 

83 self.fd.flush() 

84 self.n = n 

85 if x == 1: 

86 p(f'| Time: {t:.3f}s') 

87 self.fd.flush() 

88 self.done = True 

89 

90 def format_time(self, seconds): 

91 m, s = divmod(seconds, 60) 

92 h, m = divmod(m, 60) 

93 return f'{h:.0f}h{m:.0f}m{s:.0f}s' 

94 

95 def finish(self): 

96 self.update(1) 

97 

98 def enumerate(self, items): 

99 for i, item in enumerate(items): 

100 self.update(i / len(items)) 

101 yield i, item 

102 self.finish() 

103 

104 

105def test(): 

106 for fd in [sys.stdout, devnull, open('pb.txt', 'w')]: 

107 print(fd) 

108 pb = ProgressBar(fd) 

109 for i in range(20): 

110 pb.update(i / 20) 

111 sleep(0.03) 

112 pb.update((i + 1) / 20) 

113 pb.finish() 

114 

115 

116if __name__ == '__main__': 

117 test()