Prado EFO PVA
rule_fcst_par_asyncio_notdone.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 """
3 Created on Mon May 18 19:48:52 2020
4 
5 @author: cd
6 """
7 
8 
9 import abc
10 import numpy as np
11 from efo.rule import RuleBase, RuleRlsSpecified
12 from efo.junction import ReservoirJunction
13 # import multiprocessing as mp
14 # from multiprocessing import Pool as p
15 import asyncio
16 import nest_asyncio
17 nest_asyncio.apply()
18 
19 # TODO: Numpy array variable should be passed as reference or pointer
20 # TODO: Function arguments should be labeled "ref" or "ptr" at beginning for clarification
21 # TODO: Function argments that are array copies should be labeled "cpy"
22 
23 
24 class RuleFcstBase(RuleBase):
25  def __init__(self, name, timeFcst):
26  # Call super class constructor
27  super().__init__(name, timeFcst)
28  # self.T = timeFcst
29  self.releaserelease = np.full(self.T.Tcont.nSteps, np.nan)
30  self.releaseSchedreleaseSched = np.empty(self.T.nSteps)
31 
32  def calc_release(self, rlsProposed, rlsUnCtrl, stor, rlsPrev=None, qIn=None):
33  rlsProposed = super().calc_release(
34  rlsProposed, rlsPrev=rlsPrev, rlsUnCtrl=rlsUnCtrl, stor=stor, qIn=qIn)
35  # TODO: Check to see if the current date is a forecast date
36  self.releaseSchedreleaseSched = self.calc_release_fcstcalc_release_fcst(rlsProposed, rlsUnCtrl, stor, qIn)
37  return self.releaseSchedreleaseSched[1]
38 
39  @abc.abstractmethod
40  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn):
41  # TODO: This should return a tuple that has the release for the current time step
42  # but also the release schedule and possilby the max and min?
43  # TODO: In the world of OOP this should be a protected method because we only
44  # want to call this from other forecast rules, but in python not sure.
45  pass
46 
47  @classmethod
48  def __subclasshook__(cls, C):
49  if cls is RuleFcstBase:
50  attrs = set(dir(C))
51  if set(cls.__abstractmethods__) <= attrs:
52  return True
53  return NotImplemented
54 
55 
57  def __init__(self, name, timeFcst, efoResrvr, storTh, riskTol, constants, *,
58  efoNetwork=None, ruleRlsSpecified=None, efoRlsSched=None):
59  # Call super class constructor
60  super().__init__(name, timeFcst)
61  # Set sub class properties
62  self.efoNetworkefoNetwork = efoNetwork
63  self.efoResrvrefoResrvr = efoResrvr
64  self.ruleRlsSpecifiedruleRlsSpecified = ruleRlsSpecified
65  self.efoRlsSchedefoRlsSched = efoRlsSched
66  self.nMembersnMembers = len(efoResrvr)
67  # TODO: This shoud be a storage guide curve rule for flexibility
68  self.storThstorTh = float(storTh)
69  self.riskTolriskTol = riskTol
70  self.constantsconstants = constants
71  # if __name__ == "__main__": pool = mp.Pool(mp.cpu_count())
72  # self.pool = mp.Pool(mp.cpu_count())
73 
74  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None):
75  # Set initial conditions of the ensemble networks to the current observed
76  # TODO: Need to create efo network that has the primary network and the ensembles
77  # TODO: Should this have max and min release properties for easier access for results?
78  if self.efoNetworkefoNetwork is not None:
79  for net in self.efoNetworkefoNetwork: net.set_init_cond()
80  # Initialize variables
81  t = self.T.get_fcst_time()
82  # self.storFcst = np.empty(t.nSteps, self.nMembers, t.nSteps - 1)
83  # self.storFcst[:] = np.nan
84  self.storFcststorFcst = np.full((t.nSteps, self.nMembersnMembers, t.nSteps-1), np.nan)
85  # self.storFcst[0, :, :] = [curRes.stor[0] for curRes in self.efoResrvr]
86  self.storFcststorFcst[0, :, :] = self.efoResrvrefoResrvr[0].stor[0]
87  # self.rlsFcst = np.full((t.nSteps, self.nMembers, t.nSteps - 1), np.nan)
88  self.rlsUnCtrlFcstrlsUnCtrlFcst = np.full((t.nSteps, self.nMembersnMembers, t.nSteps-1), np.nan)
89  self.rlsUnCtrlFcstrlsUnCtrlFcst[0, :, :] = self.efoResrvrefoResrvr[0].rlsUnCtrl[0]
90  self.rlsFcstrlsFcst = np.zeros((t.nSteps, self.nMembersnMembers, t.nSteps-1))
91  for i in range(t.nSteps-1):
92  self.rlsFcstrlsFcst[i+2:, 0:self.nMembersnMembers, i] = np.nan
93  # self.rlsFcst[0, :, :] = [curRes.rlsCtrl[0] for curRes in self.efoResrvr]
94  # TODO: Maybe use the passed rlsPrev here?
95  self.rlsFcstrlsFcst[0, :, :] = self.efoResrvrefoResrvr[0].rlsCtrl[0]
96  self.rlsMaxrlsMax = np.full((t.nSteps, self.nMembersnMembers, t.nSteps-1), np.nan)
97  self.rlsNoConstraintrlsNoConstraint = np.copy(self.rlsFcstrlsFcst)
98  self.prExcThprExcTh = np.full(t.nSteps, np.nan)
99  self.prExcThPreRlsprExcThPreRls = np.copy(self.prExcThprExcTh)
100  self.iRiskyMbrsiRiskyMbrs = np.full((t.nSteps, self.nMembersnMembers), False)
101  riskySelect = np.arange(0, self.nMembersnMembers, dtype=int)
102  riskyMbr = np.copy(riskySelect)
103  iRisky = np.full(self.nMembersnMembers, False)
104  iRiskyChk = np.full(self.nMembersnMembers, True)
105  volAbvThresh = np.zeros([t.nSteps, self.nMembersnMembers])
106  vol2Rls = np.zeros([t.nSteps, self.nMembersnMembers])
107  rlsFcst = np.full(t.nSteps, np.nan)
108  rlsFcst[0] = self.efoResrvrefoResrvr[0].rlsCtrl[0]
109  rlsMax = np.full(t.nSteps, np.nan)
110  rlsToday = np.empty((t.nSteps-1, self.nMembersnMembers))
111  rlsSched = np.zeros(t.nSteps)
112  for tsHoriz in t.steps[1:]:
113  self.T.step = tsHoriz
114  if tsHoriz > 1:
115  self.storFcststorFcst[:, :, tsHoriz-1] = self.storFcststorFcst[:, :, tsHoriz-2]
116  self.rlsUnCtrlFcstrlsUnCtrlFcst[:, :, tsHoriz-1] = self.rlsUnCtrlFcstrlsUnCtrlFcst[:, :, tsHoriz-2]
117  for j in range(self.nMembersnMembers):
118  # TODO: Maybe shouldn't set the override to 0 here?
119  if issubclass(type(self.ruleRlsSpecifiedruleRlsSpecified[riskySelect[j]]), RuleRlsSpecified):
120  self.ruleRlsSpecifiedruleRlsSpecified[riskySelect[j]].set_release(tsHoriz, 0)
121  self.efoResrvrefoResrvr[j].calc_qout()
122  # TODO: In a network this should just process up to the efoReseroir...
123  # TODO: and not points downstream to save compute time
124  self.storFcststorFcst[tsHoriz, riskySelect[j], tsHoriz-1] = self.efoResrvrefoResrvr[j].stor[tsHoriz]
125  self.rlsUnCtrlFcstrlsUnCtrlFcst[tsHoriz, riskySelect[j], tsHoriz-1] = \
126  self.efoResrvrefoResrvr[j].rlsUnCtrl[tsHoriz]
127  risky = True
128  while risky:
129  storPlusUnCtrl = self.storFcststorFcst[tsHoriz, :, tsHoriz-1]\
130  + self.rlsUnCtrlFcstrlsUnCtrlFcst[tsHoriz, :, tsHoriz-1]*self.constantsconstants.cfs2af
131  iRisky = (storPlusUnCtrl > self.storThstorTh) & iRiskyChk
132  self.prExcThprExcTh[tsHoriz] = np.sum(iRisky)/self.nMembersnMembers
133  if self.prExcThprExcTh[tsHoriz] > self.riskTolriskTol[tsHoriz]:
134  self.prExcThPreRlsprExcThPreRls[tsHoriz] = self.prExcThprExcTh[tsHoriz]
135  iRiskyChk = np.copy(iRisky)
136  # riskyMbr = np.where(self.storFcst[tsHoriz, :, tsHoriz-1] > self.storTh)[0]
137  riskyMbr = np.where(iRisky)[0]
138  nMbrs2Reduce = int(len(riskyMbr) - np.floor(self.riskTolriskTol[tsHoriz]*self.nMembersnMembers))
139  volAbvThresh[tsHoriz, riskyMbr] = \
140  self.storFcststorFcst[tsHoriz, riskyMbr, tsHoriz-1]\
141  + sum(self.rlsUnCtrlFcstrlsUnCtrlFcst[1:tsHoriz+1, riskyMbr, tsHoriz-1])*self.constantsconstants.cfs2af\
142  - self.storThstorTh
143  mbrSorted = np.argsort(volAbvThresh[tsHoriz, riskyMbr])
144  riskySelect = riskyMbr[mbrSorted[0:nMbrs2Reduce]]
145  self.iRiskyMbrsiRiskyMbrs[tsHoriz, riskySelect] = True
146  # TODO: Should add a while loop so you can account for evaportion iterations
147  # TODO: This percent increase (1.01) should be a property
148  # vol2Rls[tsHoriz, riskySelect] = \
149  # 1.01*(self.storFcst[tsHoriz, riskySelect, tsHoriz-1] - self.storTh)
150  vol2Rls[tsHoriz, riskySelect] = 1.01*volAbvThresh[tsHoriz, riskySelect]
151  task = []
152  loop = asyncio.get_event_loop()
153  for j in range(len(riskySelect)):
154  rlsFcst[1:tsHoriz+1] = np.sum(
155  vol2Rls[:, riskySelect[j]])*self.constantsconstants.af2cfs/tsHoriz
156  self.rlsNoConstraintrlsNoConstraint[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = np.copy(rlsFcst[1:tsHoriz+1])
157  # rlsFcst[1:tsHoriz+1], rlsMax[1:tsHoriz+1] = self._calc_release_sched(
158  # rlsFcst[1:tsHoriz+1], tsHoriz, riskySelect[j])
159  task.append(asyncio.create_task(
160  self._calc_release_sched_calc_release_sched(rlsFcst[1:tsHoriz+1], tsHoriz, riskySelect[j])))
161  taskResults = loop.run_until_complete(asyncio.gather(*task))
162  for curResult in taskResults:
163  rlsFcst[1:tsHoriz+1] = curResult[0]
164  self.rlsMaxrlsMax[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = curResult[1]
165  # self.rlsMax[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = np.copy(rlsMax[1:tsHoriz+1])
166  if np.sum(rlsFcst[1:tsHoriz+1])*self.constantsconstants.cfs2af < np.sum(vol2Rls[:, riskySelect[j]]):
167  iRiskyChk[riskySelect[j]] = False
168  vol2Rls[tsHoriz, riskySelect[j]] = \
169  vol2Rls[tsHoriz, riskySelect[j]] - \
170  (np.sum(vol2Rls[:, riskySelect[j]]) - np.sum(rlsFcst[1:tsHoriz+1])*self.constantsconstants.cfs2af)
171  self.rlsFcstrlsFcst[1:tsHoriz+1, riskySelect[j], tsHoriz-1] = np.copy(rlsFcst[1:tsHoriz+1])
172  risky = True
173  else:
174  if np.isnan(self.prExcThPreRlsprExcThPreRls[tsHoriz]):
175  self.prExcThPreRlsprExcThPreRls[tsHoriz] = self.prExcThprExcTh[tsHoriz]
176  # TODO: Create a function that initializes these variables, would have to return...
177  # TODO: a tuple of multiple variables. Not sure this would save any code redundancy
178  riskySelect = np.arange(0, self.nMembersnMembers, dtype=int)
179  riskyMbr = np.copy(riskySelect)
180  iRiskyChk = np.full(self.nMembersnMembers, True)
181  risky = False
182  rlsToday[:, :] = self.rlsFcstrlsFcst[1, :, :].T
183  if np.any(rlsToday[:] > 0):
184  # TODO: You could have a count of the number of ens mbrs and ts that have the same day 1 release
185  # TODO: This could be used to provide multiple release schedules
186  tsMax, mbrMax = np.where(rlsToday==np.max(rlsToday))
187  # TODO: Does this need to be set since you return the release schedule and it is reset by super class?
188  self.releaserelease[self.T.Tcont.step] = self.rlsFcstrlsFcst[1, mbrMax[0], tsMax[0]]
189  rlsSched = self.rlsFcstrlsFcst[:, mbrMax[0], tsMax[0]]
190  else:
191  # TODO: Does this need to be set since you return the release schedule and it is reset by super class?
192  self.releaserelease[self.T.Tcont.step] = 0
193  return rlsSched
194 
195  @asyncio.coroutine
196  def _calc_release_sched(self, rlsFcst, tsHoriz, ensMbr):
197  pctConvg = 0.01
198  rlsTot = np.nansum(rlsFcst)
199  maxItr = 100
200  curItr = 0
201  # Create reference variables to ensemble reservoir objects
202  rlsMax = self.efoResrvrefoResrvr[ensMbr].rlsMax[1:tsHoriz+1]
203  rlsApplied = self.efoResrvrefoResrvr[ensMbr].rlsCtrl[1:tsHoriz+1]
204  rlsMaxPrev = np.zeros(len(rlsMax))
205  rlsFcstInit = np.copy(rlsFcst)
206  # idxZero = 1
207  while True:
208  # Recaculate storage
209  for i in range(1, tsHoriz+1):
210  if issubclass(type(self.efoResrvrefoResrvr[ensMbr]), ReservoirJunction):
211  self.T.step = i
212  if issubclass(type(self.ruleRlsSpecifiedruleRlsSpecified[ensMbr]), RuleRlsSpecified):
213  self.ruleRlsSpecifiedruleRlsSpecified[ensMbr].set_release(i, np.copy(rlsFcst[i-1]))
214  # TODO: In a network this should just process up to the efoReseroir...
215  # TODO: and not points downstream to save compute time
216  self.efoNetworkefoNetwork[ensMbr].process_fcst_junctions()
217  self.storFcststorFcst[i, ensMbr, tsHoriz-1] = np.copy(self.efoResrvrefoResrvr[ensMbr].stor[i])
218  # TODO: If you are maintaining forecasted storage levels then all future timesteps...
219  # TODO: should be at the max release schedule
220  if rlsTot - np.sum(rlsApplied) < pctConvg*rlsTot\
221  or np.all(np.absolute(rlsMaxPrev - rlsMax) < 0.01) or curItr > maxItr:
222  break
223  # zChk = np.array((rlsFcst, self.efoResrvr[ensMbr].rlsCtrl, self.storFcst[:, ensMbr, tsHoriz-1], rlsMax)).T
224  rlsFcst = self._get_adjusted_release_eqldist_get_adjusted_release_eqldist(rlsFcstInit, rlsMax)
225  rlsMaxPrev = np.copy(rlsMax)
226  curItr += 1
227  return rlsFcst, np.copy(rlsMax)
228 
229  def _get_adjusted_release_eqldist(self, rlsFcstInit, rlsMax):
230  rlsFcst = np.copy(rlsFcstInit)
231  rlsFcstCheck = np.zeros(len(rlsFcst))
232  iVal = ~np.isnan(rlsFcst) & ~np.isnan(rlsMax)
233  iMaxed = np.full(len(rlsFcst), False)
234  iMaxed[0] = True
235  # iMaxed = np.copy(iZeroStor)
236  while any(iMaxed[1:]==False) and any(rlsFcst[iVal] - rlsFcstCheck[iVal] != 0):
237  rlsFcstCheck = np.copy(rlsFcst)
238  iGrtrMax = rlsFcst > rlsMax
239  iMaxed = iMaxed | iGrtrMax
240  rlsDiffVol = np.sum(rlsFcst[iGrtrMax] - rlsMax[iGrtrMax])
241  rlsFcst[iGrtrMax] = rlsMax[iGrtrMax]
242  rlsAdd = rlsDiffVol/max(1,np.sum(~iMaxed & iVal))
243  rlsFcst[~iMaxed] += rlsAdd
244  return rlsFcst
245 
246 
248  def __init__(self, name, rulePfo, ruleEfoNoRls):
249  # Call super class constructor
250  super().__init__(name, rulePfo.T)
251  self.rulePforulePfo = rulePfo
252  self.ruleEfoNoRlsruleEfoNoRls = ruleEfoNoRls
253  self.riskEforiskEfo = np.full((self.T.Tcont.nSteps, self.T.nSteps), 0.)
254 
255  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None):
256  zeroRls = self.ruleEfoNoRlsruleEfoNoRls.calc_release_fcst(rlsProposed, rlsUnCtrl, stor, qIn)
257  self.riskEforiskEfo[self.T.Tcont.step,1:] = self.ruleEfoNoRlsruleEfoNoRls.prExcTh[1:]
258  rlsSched = self.rulePforulePfo.calc_release_fcst(rlsProposed, rlsUnCtrl, stor, qIn)
259  # TODO: Does this need to be set since you return the release schedule and it is reset by super class?
260  self.releaserelease[self.T.Tcont.step] = rlsSched[1]
261  return rlsSched
262 
263 
264 # TODO: This should keep track of the mapping between the fcst reservoir and the network.
266  def __init__(self, name, timeFcst, fcstNetwork):
267  # Call super class constructor
268  super().__init__(name, timeFcst)
269  self.fcstNetworkfcstNetwork = fcstNetwork
270 
271  def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor):
272  pass
273 
274 
275 
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn)
def calc_release(self, rlsProposed, rlsUnCtrl, stor, rlsPrev=None, qIn=None)
def _calc_release_sched(self, rlsFcst, tsHoriz, ensMbr)
def __init__(self, name, timeFcst, efoResrvr, storTh, riskTol, constants, *efoNetwork=None, ruleRlsSpecified=None, efoRlsSched=None)
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None)
def _get_adjusted_release_eqldist(self, rlsFcstInit, rlsMax)
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor, qIn=None)
def calc_release_fcst(self, rlsProposed, rlsUnCtrl, stor)