Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit e61dac4

Browse files
John Andersenpdxjohnny
authored andcommitted
dffml: df: Add aenter and aexit to all
Signed-off-by: John Andersen <john.s.andersen@intel.com>
1 parent 1eb908d commit e61dac4

File tree

25 files changed

+867
-273
lines changed

25 files changed

+867
-273
lines changed

.travis.yml

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,35 @@ env:
1616
- PLUGIN=.
1717
- PLUGIN=model/tensorflow
1818
- PLUGIN=feature/git
19+
- PLUGIN=feature/codesec
1920
- CHANGELOG=1
2021
- WHITESPACE=1
2122
cache:
2223
directories:
2324
- /home/travis/.cache/
2425
- /home/travis/.cargo/
26+
- /home/travis/.local/
2527
before_install:
2628
- mkdir -p /home/travis/.cache/
2729
- pip install coverage codecov
2830
- |
29-
curl -o /home/travis/.cache/cloc -sSL https://github.com/AlDanial/cloc/raw/1.80/cloc
30-
sudo cp /home/travis/.cache/cloc /usr/bin/cloc
31-
sudo chmod 755 /usr/bin/cloc
32-
- |
33-
curl https://sh.rustup.rs -sSf | sh -s -- -y
34-
source "$HOME/.cargo/env"
35-
export tokeisrc=/home/travis/.cache/tokeisrc/
36-
mkdir -p $tokeisrc
3731
if [ ! -f "$HOME/.cargo/bin/tokei" ]; then
32+
curl https://sh.rustup.rs -sSf | sh -s -- -y
33+
source "$HOME/.cargo/env"
34+
export tokeisrc=/home/travis/.cache/tokeisrc/
35+
mkdir -p $tokeisrc
3836
git clone https://github.com/Aaronepower/tokei.git --depth 1 $tokeisrc
3937
cd $tokeisrc
4038
cargo build --release
4139
cd -
4240
mv $tokeisrc/target/release/tokei "$HOME/.cargo/bin/tokei"
4341
fi
42+
- |
43+
export clocpath="$HOME/.cargo/bin/cloc"
44+
mkdir -p $(dirname $clocpath)
45+
if [ ! -f "$clocpath" ]; then
46+
curl -o $clocpath -sSL https://github.com/AlDanial/cloc/raw/1.80/cloc
47+
fi
4448
- ''
4549
script:
4650
- ./.ci/run.sh

dffml/cli.py

Lines changed: 55 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -159,75 +159,63 @@ class OperationsAll(OperationsCMD):
159159

160160
# async def operations(self, sources, features):
161161
async def run_operations(self, sources):
162-
# Orchestrate the running of these operations
163-
async with self.dff(
164-
input_network = self.input_network(
165-
self.input_network.config(self)),
166-
operation_network = self.operation_network(
167-
self.operation_network.config(self)),
168-
lock_network = self.lock_network(
169-
self.lock_network.config(self)),
170-
rchecker = self.rchecker(
171-
self.rchecker.config(self)),
172-
opimpn = self.opimpn(self.opimpn.config(self)),
173-
orchestrator = self.orchestrator(
174-
self.orchestrator.config(self))
175-
) as dffctx:
176-
177-
# Create the inputs for the ouput operations
178-
output_specs = [Input(value=value,
179-
definition=self.definitions[def_name],
180-
parents=False) \
181-
for value, def_name in self.output_specs]
182-
183-
# Add our inputs to the input network with the context being the
184-
# repo src_url
185-
async for repo in sources.repos():
186-
inputs = []
187-
for value, def_name in self.inputs:
188-
inputs.append(Input(value=value,
189-
definition=self.definitions[def_name],
190-
parents=False))
191-
if self.repo_def:
192-
inputs.append(Input(value=repo.src_url,
193-
definition=self.definitions[self.repo_def],
194-
parents=False))
195-
196-
await dffctx.ictx.add(
197-
MemoryInputSet(
198-
MemoryInputSetConfig(
199-
ctx=StringInputSetContext(repo.src_url),
200-
inputs=inputs + output_specs
162+
async with self.dff as dff:
163+
# Orchestrate the running of these operations
164+
async with dff() as dffctx:
165+
# Create the inputs for the ouput operations
166+
output_specs = [Input(value=value,
167+
definition=self.definitions[def_name],
168+
parents=False) \
169+
for value, def_name in self.output_specs]
170+
171+
# Add our inputs to the input network with the context being the
172+
# repo src_url
173+
async for repo in sources.repos():
174+
inputs = []
175+
for value, def_name in self.inputs:
176+
inputs.append(Input(value=value,
177+
definition=self.definitions[def_name],
178+
parents=False))
179+
if self.repo_def:
180+
inputs.append(Input(value=repo.src_url,
181+
definition=self.definitions[self.repo_def],
182+
parents=False))
183+
184+
await dffctx.ictx.add(
185+
MemoryInputSet(
186+
MemoryInputSetConfig(
187+
ctx=StringInputSetContext(repo.src_url),
188+
inputs=inputs + output_specs
189+
)
201190
)
202191
)
203-
)
204-
205-
async for ctx, results in dffctx.evaluate():
206-
ctx_str = (await ctx.handle()).as_string()
207-
# TODO Make a RepoInputSetContext which would let us store the
208-
# repo instead of recalling it by the URL
209-
repo = await sources.repo(ctx_str)
210-
# Remap the output operations to their feature
211-
remap = {}
212-
for output_operation_name, sub, feature_name in self.remap:
213-
if not output_operation_name in results:
214-
self.logger.error('[%s] results do not contain %s: %s',
215-
ctx_str,
216-
output_operation_name,
217-
results)
218-
continue
219-
if not sub in results[output_operation_name]:
220-
self.logger.error('[%s] %s does not contain: %s',
221-
ctx_str,
222-
sub,
223-
results[output_operation_name])
224-
continue
225-
remap[feature_name] = results[output_operation_name][sub]
226-
# Store the results
227-
repo.evaluated(remap)
228-
yield repo
229-
if self.update:
230-
await sources.update(repo)
192+
193+
async for ctx, results in dffctx.evaluate(strict=True):
194+
ctx_str = (await ctx.handle()).as_string()
195+
# TODO Make a RepoInputSetContext which would let us store the
196+
# repo instead of recalling it by the URL
197+
repo = await sources.repo(ctx_str)
198+
# Remap the output operations to their feature
199+
remap = {}
200+
for output_operation_name, sub, feature_name in self.remap:
201+
if not output_operation_name in results:
202+
self.logger.error('[%s] results do not contain %s: %s',
203+
ctx_str,
204+
output_operation_name,
205+
results)
206+
continue
207+
if not sub in results[output_operation_name]:
208+
self.logger.error('[%s] %s does not contain: %s',
209+
ctx_str,
210+
sub,
211+
results[output_operation_name])
212+
continue
213+
remap[feature_name] = results[output_operation_name][sub]
214+
# Store the results
215+
repo.evaluated(remap)
216+
yield repo
217+
if self.update:
218+
await sources.update(repo)
231219

232220
async def run(self):
233221
# async with self.sources as sources, self.features as features:

dffml/df/base.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def args(cls) -> Dict[str, Arg]:
2424
def config(cls, cmd: CMD):
2525
pass
2626

27+
@classmethod
28+
def withconfig(cls, cmd: CMD):
29+
return cls(cls.config(cmd))
30+
2731
class OperationImplementationContext(abc.ABC):
2832

2933
def __init__(self,
@@ -81,7 +85,10 @@ async def __call__(self,
8185
ctx: 'BaseInputSetContext',
8286
ictx: 'BaseInputNetworkContext') \
8387
-> OperationImplementationContext:
84-
return OperationImplementationContext(self, ictx)
88+
return OperationImplementationContext(self, ctx, ictx)
89+
90+
async def __aenter__(self) -> 'OperationImplementation':
91+
return self
8592

8693
async def __aexit__(self, exc_type, exc_value, traceback):
8794
pass
@@ -647,20 +654,20 @@ def __init__(self,
647654
self.logger = LOGGER.getChild(self.__class__.__qualname__)
648655

649656
@abc.abstractmethod
650-
async def run_operations(self) \
657+
async def run_operations(self, strict: bool = False) \
651658
-> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]:
652659
'''
653660
Run all Stage.PROCESSING operations
654661
'''
655662
pass
656663

657-
async def run_until_complete(self) \
664+
async def run_until_complete(self, strict: bool = False) \
658665
-> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]:
659666
'''
660667
Run all the operations then run cleanup and output operations
661668
'''
662669
# Run all operations until no more are run
663-
async for ctx, results in self.run_operations():
670+
async for ctx, results in self.run_operations(strict=strict):
664671
yield ctx, results
665672

666673
async def __aenter__(self) -> 'BaseOrchestratorContext':

dffml/df/dff.py

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@ def __init__(self,
1616
operation_network: BaseOperationNetwork,
1717
lock_network: BaseLockNetwork,
1818
rchecker: BaseRedundancyChecker,
19-
opimpn: BaseOperationImplementationNetwork,
19+
opimp_network: BaseOperationImplementationNetwork,
2020
orchestrator: BaseOrchestrator) -> None:
2121
self.input_network = input_network
2222
self.operation_network = operation_network
2323
self.lock_network = lock_network
2424
self.rchecker = rchecker
25-
self.opimp_network = opimpn
25+
self.opimp_network = opimp_network
2626
self.orchestrator = orchestrator
2727
self.logger = LOGGER.getChild(self.__class__.__qualname__)
2828
self.__stack = None
2929

30-
async def evaluate(self):
30+
async def evaluate(self, strict=False):
3131
# Orchestrate the running of these operations
3232
async with self.orchestrator(self.ictx, self.octx, self.lctx, self.nctx,
3333
self.rctx) as orchestrate:
34-
async for ctx, results in orchestrate.run_until_complete():
34+
async for ctx, results in orchestrate.run_until_complete(strict=strict):
3535
yield ctx, results
3636

3737
async def __aenter__(self) -> 'DataFlowFacilitatorContext':
@@ -42,15 +42,15 @@ async def __aenter__(self) -> 'DataFlowFacilitatorContext':
4242
self.__stack = AsyncExitStack()
4343
await self.__stack.__aenter__()
4444
self.rctx = await self.__stack.enter_async_context(
45-
self.rchecker())
45+
self.rchecker)
4646
self.ictx = await self.__stack.enter_async_context(
47-
self.input_network())
47+
self.input_network)
4848
self.octx = await self.__stack.enter_async_context(
49-
self.operation_network())
49+
self.operation_network)
5050
self.lctx = await self.__stack.enter_async_context(
51-
self.lock_network())
51+
self.lock_network)
5252
self.nctx = await self.__stack.enter_async_context(
53-
self.opimp_network())
53+
self.opimp_network)
5454
return self
5555

5656
async def __aexit__(self, exc_type, exc_value, traceback):
@@ -61,20 +61,45 @@ class DataFlowFacilitator(object):
6161
Data Flow Facilitator-tots
6262
'''
6363

64-
def __init__(self) -> None:
65-
self.logger = LOGGER.getChild(self.__class__.__qualname__)
66-
67-
def __call__(self,
64+
def __init__(self,
6865
input_network: BaseInputNetwork,
6966
operation_network: BaseOperationNetwork,
7067
lock_network: BaseLockNetwork,
7168
rchecker: BaseRedundancyChecker,
72-
opimpn: BaseOperationImplementationNetwork,
73-
orchestrator: BaseOrchestrator) \
74-
-> 'DataFlowFacilitatorContext':
75-
return DataFlowFacilitatorContext(input_network,
76-
operation_network,
77-
lock_network,
78-
rchecker,
79-
opimpn,
80-
orchestrator)
69+
opimp_network: BaseOperationImplementationNetwork,
70+
orchestrator: BaseOrchestrator) -> None:
71+
self.rchecker = rchecker
72+
self.input_network = input_network
73+
self.operation_network = operation_network
74+
self.lock_network = lock_network
75+
self.opimp_network = opimp_network
76+
self.orchestrator = orchestrator
77+
self.logger = LOGGER.getChild(self.__class__.__qualname__)
78+
79+
def __call__(self) -> 'DataFlowFacilitatorContext':
80+
return DataFlowFacilitatorContext(self.input_network(),
81+
self.operation_network(),
82+
self.lock_network(),
83+
self.rchecker(),
84+
self.opimp_network(),
85+
self.orchestrator)
86+
87+
async def __aenter__(self) -> 'DataFlowFacilitator':
88+
self.__stack = AsyncExitStack()
89+
await self.__stack.__aenter__()
90+
# self.rchecker = await self.__stack.enter_async_context(
91+
# self.rchecker)
92+
# self.input_network = await self.__stack.enter_async_context(
93+
# self.input_network)
94+
# self.operation_network = await self.__stack.enter_async_context(
95+
# self.operation_network)
96+
# self.lock_network = await self.__stack.enter_async_context(
97+
# self.lock_network)
98+
self.opimp_network = await self.__stack.enter_async_context(
99+
self.opimp_network)
100+
# self.orchestrator = await self.__stack.enter_async_context(
101+
# self.orchestrator)
102+
return self
103+
104+
async def __aexit__(self, exc_type, exc_value, traceback):
105+
await self.__stack.aclose()

0 commit comments

Comments
 (0)