1: <?php
2: /**
3: * CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
4: * Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org)
5: *
6: * Licensed under The MIT License
7: * For full copyright and license information, please see the LICENSE.txt
8: * Redistributions of files must retain the above copyright notice.
9: *
10: * @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org)
11: * @link https://cakephp.org CakePHP(tm) Project
12: * @license https://opensource.org/licenses/mit-license.php MIT License
13: */
14: namespace Cake\Collection\Iterator;
15:
16: use ArrayIterator;
17: use IteratorAggregate;
18: use LogicException;
19: use Traversable;
20:
21: /**
22: * Implements a simplistic version of the popular Map-Reduce algorithm. Acts
23: * like an iterator for the original passed data after each result has been
24: * processed, thus offering a transparent wrapper for results coming from any
25: * source.
26: */
27: class MapReduce implements IteratorAggregate
28: {
29:
30: /**
31: * Holds the shuffled results that were emitted from the map
32: * phase
33: *
34: * @var array
35: */
36: protected $_intermediate = [];
37:
38: /**
39: * Holds the results as emitted during the reduce phase
40: *
41: * @var array
42: */
43: protected $_result = [];
44:
45: /**
46: * Whether the Map-Reduce routine has been executed already on the data
47: *
48: * @var bool
49: */
50: protected $_executed = false;
51:
52: /**
53: * Holds the original data that needs to be processed
54: *
55: * @var \Traversable|null
56: */
57: protected $_data;
58:
59: /**
60: * A callable that will be executed for each record in the original data
61: *
62: * @var callable
63: */
64: protected $_mapper;
65:
66: /**
67: * A callable that will be executed for each intermediate record emitted during
68: * the Map phase
69: *
70: * @var callable|null
71: */
72: protected $_reducer;
73:
74: /**
75: * Count of elements emitted during the Reduce phase
76: *
77: * @var int
78: */
79: protected $_counter = 0;
80:
81: /**
82: * Constructor
83: *
84: * ### Example:
85: *
86: * Separate all unique odd and even numbers in an array
87: *
88: * ```
89: * $data = new \ArrayObject([1, 2, 3, 4, 5, 3]);
90: * $mapper = function ($value, $key, $mr) {
91: * $type = ($value % 2 === 0) ? 'even' : 'odd';
92: * $mr->emitIntermediate($value, $type);
93: * };
94: *
95: * $reducer = function ($numbers, $type, $mr) {
96: * $mr->emit(array_unique($numbers), $type);
97: * };
98: * $results = new MapReduce($data, $mapper, $reducer);
99: * ```
100: *
101: * Previous example will generate the following result:
102: *
103: * ```
104: * ['odd' => [1, 3, 5], 'even' => [2, 4]]
105: * ```
106: *
107: * @param \Traversable $data the original data to be processed
108: * @param callable $mapper the mapper callback. This function will receive 3 arguments.
109: * The first one is the current value, second the current results key and third is
110: * this class instance so you can call the result emitters.
111: * @param callable|null $reducer the reducer callback. This function will receive 3 arguments.
112: * The first one is the list of values inside a bucket, second one is the name
113: * of the bucket that was created during the mapping phase and third one is an
114: * instance of this class.
115: */
116: public function __construct(Traversable $data, callable $mapper, callable $reducer = null)
117: {
118: $this->_data = $data;
119: $this->_mapper = $mapper;
120: $this->_reducer = $reducer;
121: }
122:
123: /**
124: * Returns an iterator with the end result of running the Map and Reduce
125: * phases on the original data
126: *
127: * @return \ArrayIterator
128: */
129: public function getIterator()
130: {
131: if (!$this->_executed) {
132: $this->_execute();
133: }
134:
135: return new ArrayIterator($this->_result);
136: }
137:
138: /**
139: * Appends a new record to the bucket labelled with $key, usually as a result
140: * of mapping a single record from the original data.
141: *
142: * @param mixed $val The record itself to store in the bucket
143: * @param string $bucket the name of the bucket where to put the record
144: * @return void
145: */
146: public function emitIntermediate($val, $bucket)
147: {
148: $this->_intermediate[$bucket][] = $val;
149: }
150:
151: /**
152: * Appends a new record to the final list of results and optionally assign a key
153: * for this record.
154: *
155: * @param mixed $val The value to be appended to the final list of results
156: * @param string|null $key and optional key to assign to the value
157: * @return void
158: */
159: public function emit($val, $key = null)
160: {
161: $this->_result[$key === null ? $this->_counter : $key] = $val;
162: $this->_counter++;
163: }
164:
165: /**
166: * Runs the actual Map-Reduce algorithm. This is iterate the original data
167: * and call the mapper function for each , then for each intermediate
168: * bucket created during the Map phase call the reduce function.
169: *
170: * @return void
171: * @throws \LogicException if emitIntermediate was called but no reducer function
172: * was provided
173: */
174: protected function _execute()
175: {
176: $mapper = $this->_mapper;
177: foreach ($this->_data as $key => $val) {
178: $mapper($val, $key, $this);
179: }
180: $this->_data = null;
181:
182: if (!empty($this->_intermediate) && empty($this->_reducer)) {
183: throw new LogicException('No reducer function was provided');
184: }
185:
186: $reducer = $this->_reducer;
187: foreach ($this->_intermediate as $key => $list) {
188: $reducer($list, $key, $this);
189: }
190: $this->_intermediate = [];
191: $this->_executed = true;
192: }
193: }
194: