fix bug:MultiPagePipeline and DoubleKeyMap concurrent bug
parent
6b9d21fcf3
commit
74962d69b9
|
@ -36,51 +36,61 @@ public class MultiPagePipeline implements Pipeline {
|
||||||
private void handleObject(Iterator<Map.Entry<String, Object>> iterator) {
|
private void handleObject(Iterator<Map.Entry<String, Object>> iterator) {
|
||||||
Map.Entry<String, Object> objectEntry = iterator.next();
|
Map.Entry<String, Object> objectEntry = iterator.next();
|
||||||
Object o = objectEntry.getValue();
|
Object o = objectEntry.getValue();
|
||||||
|
//需要拼凑
|
||||||
if (o instanceof MultiPageModel) {
|
if (o instanceof MultiPageModel) {
|
||||||
MultiPageModel multiPageModel = (MultiPageModel) o;
|
MultiPageModel multiPageModel = (MultiPageModel) o;
|
||||||
pageMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), Boolean.TRUE);
|
//这次处理的部分,设置为完成
|
||||||
if (multiPageModel.getOtherPages() != null) {
|
pageMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), Boolean.FALSE);
|
||||||
for (String otherPage : multiPageModel.getOtherPages()) {
|
//每个key单独加锁
|
||||||
Boolean aBoolean = pageMap.get(multiPageModel.getPageKey(), otherPage);
|
synchronized (pageMap.get(multiPageModel.getPageKey())) {
|
||||||
if (aBoolean == null) {
|
pageMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), Boolean.TRUE);
|
||||||
pageMap.put(multiPageModel.getPageKey(), otherPage, Boolean.FALSE);
|
//其他需要拼凑的部分
|
||||||
}
|
if (multiPageModel.getOtherPages() != null) {
|
||||||
}
|
for (String otherPage : multiPageModel.getOtherPages()) {
|
||||||
}
|
Boolean aBoolean = pageMap.get(multiPageModel.getPageKey(), otherPage);
|
||||||
//check if all pages are processed
|
if (aBoolean == null) {
|
||||||
Map<String, Boolean> booleanMap = pageMap.get(multiPageModel.getPageKey());
|
pageMap.put(multiPageModel.getPageKey(), otherPage, Boolean.FALSE);
|
||||||
objectMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), multiPageModel);
|
|
||||||
if (booleanMap == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, Boolean> stringBooleanEntry : booleanMap.entrySet()) {
|
|
||||||
if (!stringBooleanEntry.getValue()) {
|
|
||||||
iterator.remove();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
List<Map.Entry<String, MultiPageModel>> entryList = new ArrayList<Map.Entry<String, MultiPageModel>>();
|
|
||||||
entryList.addAll(objectMap.get(multiPageModel.getPageKey()).entrySet());
|
|
||||||
if (entryList.size() != 0) {
|
|
||||||
Collections.sort(entryList, new Comparator<Map.Entry<String, MultiPageModel>>() {
|
|
||||||
@Override
|
|
||||||
public int compare(Map.Entry<String, MultiPageModel> o1, Map.Entry<String, MultiPageModel> o2) {
|
|
||||||
try {
|
|
||||||
int i1 = Integer.parseInt(o1.getKey());
|
|
||||||
int i2 = Integer.parseInt(o2.getKey());
|
|
||||||
return i1 - i2;
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
return o1.getKey().compareTo(o2.getKey());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
MultiPageModel value = entryList.get(0).getValue();
|
|
||||||
for (int i = 1; i < entryList.size(); i++) {
|
|
||||||
value = value.combine(entryList.get(i).getValue());
|
|
||||||
}
|
}
|
||||||
objectEntry.setValue(value);
|
//check if all pages are processed
|
||||||
|
Map<String, Boolean> booleanMap = pageMap.get(multiPageModel.getPageKey());
|
||||||
|
objectMap.put(multiPageModel.getPageKey(), multiPageModel.getPage(), multiPageModel);
|
||||||
|
if (booleanMap == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// /过滤,这次完成的page item中,还未拼凑完整的item,不进入下一个pipeline
|
||||||
|
for (Map.Entry<String, Boolean> stringBooleanEntry : booleanMap.entrySet()) {
|
||||||
|
if (!stringBooleanEntry.getValue()) {
|
||||||
|
iterator.remove();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<Map.Entry<String, MultiPageModel>> entryList = new ArrayList<Map.Entry<String, MultiPageModel>>();
|
||||||
|
entryList.addAll(objectMap.get(multiPageModel.getPageKey()).entrySet());
|
||||||
|
if (entryList.size() != 0) {
|
||||||
|
Collections.sort(entryList, new Comparator<Map.Entry<String, MultiPageModel>>() {
|
||||||
|
@Override
|
||||||
|
public int compare(Map.Entry<String, MultiPageModel> o1, Map.Entry<String, MultiPageModel> o2) {
|
||||||
|
try {
|
||||||
|
int i1 = Integer.parseInt(o1.getKey());
|
||||||
|
int i2 = Integer.parseInt(o2.getKey());
|
||||||
|
return i1 - i2;
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
return o1.getKey().compareTo(o2.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// 合并
|
||||||
|
MultiPageModel value = entryList.get(0).getValue();
|
||||||
|
for (int i = 1; i < entryList.size(); i++) {
|
||||||
|
value = value.combine(entryList.get(i).getValue());
|
||||||
|
}
|
||||||
|
objectEntry.setValue(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,8 +75,9 @@ public class DoubleKeyMap<K1, K2, V> extends MultiKeyMapBase {
|
||||||
* @param value
|
* @param value
|
||||||
* @return value
|
* @return value
|
||||||
*/
|
*/
|
||||||
public V put(K1 key1, K2 key2, V value) {
|
public synchronized V put(K1 key1, K2 key2, V value) {
|
||||||
if (map.get(key1) == null) {
|
if (map.get(key1) == null) {
|
||||||
|
//不加锁的话,多个线程有可能都会执行到这里
|
||||||
map.put(key1, this.<K2, V>newMap());
|
map.put(key1, this.<K2, V>newMap());
|
||||||
}
|
}
|
||||||
return get(key1).put(key2, value);
|
return get(key1).put(key2, value);
|
||||||
|
@ -87,7 +88,7 @@ public class DoubleKeyMap<K1, K2, V> extends MultiKeyMapBase {
|
||||||
* @param key2
|
* @param key2
|
||||||
* @return value
|
* @return value
|
||||||
*/
|
*/
|
||||||
public V remove(K1 key1, K2 key2) {
|
public synchronized V remove(K1 key1, K2 key2) {
|
||||||
if (get(key1) == null) {
|
if (get(key1) == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue