From 72e2d4c9763d186377a05b333495e5669c595165 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 7 Aug 2019 12:13:02 +0300 Subject: [PATCH 01/11] Add initial version of ParallelGetLink --- opencog/atoms/atom_types/atom_types.script | 2 + opencog/atoms/core/ScopeLink.cc | 15 ++++ opencog/atoms/core/ScopeLink.h | 2 + opencog/atoms/pattern/CMakeLists.txt | 2 + opencog/atoms/pattern/ParallelGetLink.cc | 94 ++++++++++++++++++++++ opencog/atoms/pattern/ParallelGetLink.h | 59 ++++++++++++++ opencog/atoms/pattern/PatternLink.cc | 4 +- 7 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 opencog/atoms/pattern/ParallelGetLink.cc create mode 100644 opencog/atoms/pattern/ParallelGetLink.h diff --git a/opencog/atoms/atom_types/atom_types.script b/opencog/atoms/atom_types/atom_types.script index 2f724a23f1..201c391473 100644 --- a/opencog/atoms/atom_types/atom_types.script +++ b/opencog/atoms/atom_types/atom_types.script @@ -386,6 +386,7 @@ SATISFYING_LINK <- PATTERN_LINK // QueryLink is identical to BindLink, except it returns a LinkValue // holding the result, instead of a SetLink. (Less atomspace pollution). GET_LINK <- SATISFYING_LINK // Finds all groundings, returns them +PARALLEL_GET_LINK <- SATISFYING_LINK // Finds all groundings, returns them QUERY_LINK <- SATISFYING_LINK // Finds all groundings, substitutes. BIND_LINK <- QUERY_LINK // Finds all groundings, substitutes. @@ -489,6 +490,7 @@ INTERVAL_LINK <- ORDERED_LINK // ListLink, to indicate that they are either done, or awaiting // processing. ANCHOR_NODE <- NODE +SET_NODE <- NODE // ==================================================================== // Relations. A "relation" is a subset of a Cartesian product, or more diff --git a/opencog/atoms/core/ScopeLink.cc b/opencog/atoms/core/ScopeLink.cc index 4e85df03a7..73b438c009 100644 --- a/opencog/atoms/core/ScopeLink.cc +++ b/opencog/atoms/core/ScopeLink.cc @@ -101,6 +101,21 @@ void ScopeLink::extract_variables(const HandleSeq& oset) if (UNQUOTE_LINK == decls) return; + // oset: + // variables + // body + // target + if (oset.size() == 3) + { + _vardecl = oset[0]; + _body = oset[1]; + _target = oset[2]; + + // Initialize _varlist with the scoped variables + init_scoped_variables(_vardecl); + return; + } + // If the first atom is not explicitly a variable declaration, then // there are no variable declarations. There are two cases that can // apply here: either the body is a lambda, in which case, we copy diff --git a/opencog/atoms/core/ScopeLink.h b/opencog/atoms/core/ScopeLink.h index 64f2d10fc4..6397636fbe 100644 --- a/opencog/atoms/core/ScopeLink.h +++ b/opencog/atoms/core/ScopeLink.h @@ -56,6 +56,8 @@ class ScopeLink : public Link /// Handle of the body of the expression. Handle _body; + Handle _target; + /// Variables bound in the body. Variables _varlist; diff --git a/opencog/atoms/pattern/CMakeLists.txt b/opencog/atoms/pattern/CMakeLists.txt index 61dac4f281..528ede5930 100644 --- a/opencog/atoms/pattern/CMakeLists.txt +++ b/opencog/atoms/pattern/CMakeLists.txt @@ -6,6 +6,7 @@ ADD_LIBRARY (lambda BindLink.cc DualLink.cc GetLink.cc + ParallelGetLink.cc PatternLink.cc PatternTerm.cc PatternUtils.cc @@ -32,6 +33,7 @@ INSTALL (FILES BindLink.h DualLink.h GetLink.h + ParallelGetLink.h PatternLink.h Pattern.h PatternTerm.h diff --git a/opencog/atoms/pattern/ParallelGetLink.cc b/opencog/atoms/pattern/ParallelGetLink.cc new file mode 100644 index 0000000000..e35bcc8697 --- /dev/null +++ b/opencog/atoms/pattern/ParallelGetLink.cc @@ -0,0 +1,94 @@ +/* + * ParallelGetLink.cc + * + * Copyright (C) 2019 Linas Vepstas + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License v3 as + * published by the Free Software Foundation and including the + * exceptions + * at http://opencog.org/wiki/Licenses + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public + * License + * along with this program; if not, write to: + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include +#include + +#include "ParallelGetLink.h" + +using namespace opencog; + +void ParallelGetLink::init(void) +{ + Type t = get_type(); + if (not nameserver().isA(t, PARALLEL_GET_LINK)) + { + const std::string& tname = nameserver().getTypeName(t); + throw InvalidParamException(TRACE_INFO, + "Expecting a ParallelGetLink, got %s", tname.c_str()); + } +} + +ParallelGetLink::ParallelGetLink(const HandleSeq& hseq, Type t) + : PatternLink(hseq, t) +{ + init(); +} + +ParallelGetLink::ParallelGetLink(const Link &l) + : PatternLink(l) +{ + init(); +} + +/* ================================================================= */ + +HandleSet ParallelGetLink::do_execute(AtomSpace* as, bool silent) +{ + if (nullptr == as) as = _atom_space; + + SatisfyingSet sater(as); + this->satisfy(sater); + + return sater._satisfying_set; +} + +ValuePtr ParallelGetLink::execute(AtomSpace* as, bool silent) +{ + + HandleSet handle_set = do_execute(as, silent); + Handle satset(createUnorderedLink(handle_set, SET_LINK)); + + for (auto h: handle_set) + { + HandleSeq handle_seq; + handle_seq.push_back(h); + handle_seq.push_back(_target); + Handle member_link = createLink(handle_seq, MEMBER_LINK); + +#define PLACE_RESULTS_IN_ATOMSPACE +#ifdef PLACE_RESULTS_IN_ATOMSPACE + // Shoot. XXX FIXME. Most of the unit tests require that the atom + // that we return is in the atomspace. But it would be nice if we + // could defer this indefinitely, until its really needed. + if (as) member_link = as->add_atom(member_link); +#endif /* PLACE_RESULTS_IN_ATOMSPACE */ + } + + return _target; +} + +DEFINE_LINK_FACTORY(ParallelGetLink, PARALLEL_GET_LINK) + +/* ===================== END OF FILE ===================== */ diff --git a/opencog/atoms/pattern/ParallelGetLink.h b/opencog/atoms/pattern/ParallelGetLink.h new file mode 100644 index 0000000000..51ea213f1f --- /dev/null +++ b/opencog/atoms/pattern/ParallelGetLink.h @@ -0,0 +1,59 @@ +/* + * opencog/atoms/pattern/ParallelGetLink.h + * + * Copyright (C) 2019 Linas Vepstas + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License v3 as + * published by the Free Software Foundation and including the exceptions + * at http://opencog.org/wiki/Licenses + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program; if not, write to: + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#ifndef _OPENCOG_PARALLEL_GET_LINK_H +#define _OPENCOG_PARALLEL_GET_LINK_H + +#include + +namespace opencog +{ +/** \addtogroup grp_atomspace + * @{ + */ +class ParallelGetLink : public PatternLink +{ +protected: + void init(void); + virtual HandleSet do_execute(AtomSpace*, bool silent); + +public: + ParallelGetLink(const HandleSeq&, Type=PARALLEL_GET_LINK); + explicit ParallelGetLink(const Link &l); + + virtual bool is_executable() const { return true; } + virtual ValuePtr execute(AtomSpace*, bool silent=false); + + static Handle factory(const Handle&); +}; + +typedef std::shared_ptr ParallelGetLinkPtr; +static inline ParallelGetLinkPtr ParallelGetLinkCast(const Handle& h) + { AtomPtr a(h); return std::dynamic_pointer_cast(a); } +static inline ParallelGetLinkPtr ParallelGetLinkCast(AtomPtr a) + { return std::dynamic_pointer_cast(a); } + +#define createParallelGetLink std::make_shared + +/** @}*/ +} + +#endif // _OPENCOG_PARALLEL_GET_LINK_H diff --git a/opencog/atoms/pattern/PatternLink.cc b/opencog/atoms/pattern/PatternLink.cc index dda1fcfd32..a6ed1974f6 100644 --- a/opencog/atoms/pattern/PatternLink.cc +++ b/opencog/atoms/pattern/PatternLink.cc @@ -147,7 +147,9 @@ void PatternLink::init(void) // not set. if (nullptr == _body) return; - if (2 < _outgoing.size() or + // TODO: Update size check conditions + if (3 < _outgoing.size() or + (1 == _outgoing.size() and _outgoing[0] != _body) or (2 == _outgoing.size() and _outgoing[1] != _body)) { throw InvalidParamException(TRACE_INFO, From 62ad46ca668fce06d2ad93c7f74053a6fc7cdc4a Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 7 Aug 2019 14:24:20 +0300 Subject: [PATCH 02/11] Remove unused satset from ParallelGetLink --- opencog/atoms/pattern/ParallelGetLink.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/opencog/atoms/pattern/ParallelGetLink.cc b/opencog/atoms/pattern/ParallelGetLink.cc index e35bcc8697..e7ef8d56e9 100644 --- a/opencog/atoms/pattern/ParallelGetLink.cc +++ b/opencog/atoms/pattern/ParallelGetLink.cc @@ -68,7 +68,6 @@ ValuePtr ParallelGetLink::execute(AtomSpace* as, bool silent) { HandleSet handle_set = do_execute(as, silent); - Handle satset(createUnorderedLink(handle_set, SET_LINK)); for (auto h: handle_set) { From 29719edee9d1cfba5d50552b5496aab76e400091 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 7 Aug 2019 14:24:57 +0300 Subject: [PATCH 03/11] Handle SetNode in PutLink --- opencog/atoms/core/PutLink.cc | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/opencog/atoms/core/PutLink.cc b/opencog/atoms/core/PutLink.cc index fd0c5306cf..0df724a032 100644 --- a/opencog/atoms/core/PutLink.cc +++ b/opencog/atoms/core/PutLink.cc @@ -25,6 +25,7 @@ #include "DefineLink.h" #include "LambdaLink.h" #include "PutLink.h" +#include using namespace opencog; @@ -431,24 +432,36 @@ Handle PutLink::do_reduce(void) const // If there is only one variable in the PutLink body... if (1 == nvars) { - if (SET_LINK != vtype) + + if (SET_LINK == vtype) { - return reddy(subs, {args}); + // If the arguments are given in a set, then iterate over the set... + HandleSeq bset; + for (const Handle& h : args->getOutgoingSet()) + { + HandleSeq oset; + oset.emplace_back(h); + try + { + bset.emplace_back(reddy(subs, oset)); + } + catch (const TypeCheckException& ex) {} + } + return createLink(bset, SET_LINK); } - // If the arguments are given in a set, then iterate over the set... - HandleSeq bset; - for (const Handle& h : args->getOutgoingSet()) + if (SET_NODE == vtype) { - HandleSeq oset; - oset.emplace_back(h); - try + AtomSpace* as = getAtomSpace(); + for (const LinkPtr& lp : args->getIncomingSetByType(MEMBER_LINK)) { - bset.emplace_back(reddy(subs, oset)); + Handle h = lp->getOutgoingAtom(0); + if (as) as->add_atom(reddy(subs, {h})); } - catch (const TypeCheckException& ex) {} + return args; } - return createLink(bset, SET_LINK); + + return reddy(subs, {args}); } // If we are here, then there are multiple variables in the body. From 82e990ff398e7312ea8677871841fc85c6a996eb Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Thu, 8 Aug 2019 13:56:35 +0300 Subject: [PATCH 04/11] Add initial version of ParallelSatisfier --- opencog/atoms/pattern/ParallelGetLink.cc | 2 +- opencog/query/PatternLinkRuntime.cc | 1 + opencog/query/Satisfier.cc | 49 ++++++++++++++++++++++++ opencog/query/Satisfier.h | 32 ++++++++++++++++ 4 files changed, 83 insertions(+), 1 deletion(-) diff --git a/opencog/atoms/pattern/ParallelGetLink.cc b/opencog/atoms/pattern/ParallelGetLink.cc index e7ef8d56e9..0344272d55 100644 --- a/opencog/atoms/pattern/ParallelGetLink.cc +++ b/opencog/atoms/pattern/ParallelGetLink.cc @@ -58,7 +58,7 @@ HandleSet ParallelGetLink::do_execute(AtomSpace* as, bool silent) { if (nullptr == as) as = _atom_space; - SatisfyingSet sater(as); + ParallelSatisfier sater(as); this->satisfy(sater); return sater._satisfying_set; diff --git a/opencog/query/PatternLinkRuntime.cc b/opencog/query/PatternLinkRuntime.cc index 1d232a874b..c587373fa8 100644 --- a/opencog/query/PatternLinkRuntime.cc +++ b/opencog/query/PatternLinkRuntime.cc @@ -325,6 +325,7 @@ bool PatternLink::satisfy(PatternMatchCallback& pmcb) const // in a direct fashion. if (_num_comps <= 1) { + PatternMatchEngine pme(pmcb); debug_log(); diff --git a/opencog/query/Satisfier.cc b/opencog/query/Satisfier.cc index 66b6b335c3..faac68ee8e 100644 --- a/opencog/query/Satisfier.cc +++ b/opencog/query/Satisfier.cc @@ -141,10 +141,59 @@ bool SatisfyingSet::grounding(const HandleMap &var_soln, { vargnds.push_back(var_soln.at(hv)); } + + _satisfying_set.emplace(createLink(vargnds, LIST_LINK)); + + // If we found as many as we want, then stop looking for more. + return (_satisfying_set.size() >= max_results); +} + +// =========================================================== + +bool ParallelSatisfier::grounding(const HandleMap &var_soln, + const HandleMap &term_soln) +{ + // PatternMatchEngine::log_solution(var_soln, term_soln); + + // Do not accept new solution if maximum number has been already reached + if (_satisfying_set.size() >= max_results) + return true; + + if (1 == _varseq.size()) + { + // std::map::at() can throw. Rethrow for easier deubugging. + try + { + _satisfying_set.emplace(var_soln.at(_varseq[0])); + } + catch (...) + { + throw AssertionException(TRACE_INFO, + "Internal error: ungrounded variable %s\n", + _varseq[0]->to_string().c_str()); + } + + // If we found as many as we want, then stop looking for more. + return (_satisfying_set.size() >= max_results); + } + + // If more than one variable, encapsulate in sequential order, + // in a ListLink. + HandleSeq vargnds; + for (const Handle& hv : _varseq) + { + vargnds.push_back(var_soln.at(hv)); + } + _satisfying_set.emplace(createLink(vargnds, LIST_LINK)); // If we found as many as we want, then stop looking for more. return (_satisfying_set.size() >= max_results); } +bool ParallelSatisfier::search_finished(bool done) +{ + return done; +} + /* ===================== END OF FILE ===================== */ diff --git a/opencog/query/Satisfier.h b/opencog/query/Satisfier.h index 78af8e2417..e7c15a5907 100644 --- a/opencog/query/Satisfier.h +++ b/opencog/query/Satisfier.h @@ -121,6 +121,38 @@ class SatisfyingSet : const HandleMap &term_soln); }; +class ParallelSatisfier : + public virtual InitiateSearchCB, + public virtual DefaultPatternMatchCB +{ + public: + ParallelSatisfier(AtomSpace* as) : + InitiateSearchCB(as), DefaultPatternMatchCB(as), + max_results(SIZE_MAX) {} + + HandleSeq _varseq; + HandleSet _satisfying_set; + size_t max_results; + + virtual void set_pattern(const Variables& vars, + const Pattern& pat) + { + _varseq = vars.varseq; + InitiateSearchCB::set_pattern(vars, pat); + DefaultPatternMatchCB::set_pattern(vars, pat); + } + + // Return true if a satisfactory grounding has been + // found. Note that in case where you want all possible + // groundings, this will usually return false, so the + // patternMatchEngine can keep looking for ever more + // groundings. + virtual bool grounding(const HandleMap &var_soln, + const HandleMap &term_soln); + + virtual bool search_finished(bool); +}; + }; // namespace opencog #endif // _OPENCOG_SATISFIER_H From 076c8b2e0f61d8241b50df4b2d0878225326425f Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Thu, 8 Aug 2019 19:00:58 +0300 Subject: [PATCH 05/11] Add initial implementation of ProducerConsumerValue --- opencog/atoms/atom_types/atom_types.script | 1 + opencog/atoms/pattern/ParallelGetLink.cc | 2 +- opencog/atoms/value/CMakeLists.txt | 2 + opencog/atoms/value/ProducerConsumerValue.cc | 71 ++++++++++++ opencog/atoms/value/ProducerConsumerValue.h | 109 +++++++++++++++++++ opencog/query/Satisfier.cc | 5 +- opencog/query/Satisfier.h | 18 ++- 7 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 opencog/atoms/value/ProducerConsumerValue.cc create mode 100644 opencog/atoms/value/ProducerConsumerValue.h diff --git a/opencog/atoms/atom_types/atom_types.script b/opencog/atoms/atom_types/atom_types.script index 201c391473..49febaf49f 100644 --- a/opencog/atoms/atom_types/atom_types.script +++ b/opencog/atoms/atom_types/atom_types.script @@ -23,6 +23,7 @@ FLOAT_VALUE <- VALUE // vector of floats, actually. STRING_VALUE <- VALUE // vector of strings LINK_VALUE <- VALUE // vector of values ("link" holding values) VALUATION <- VALUE // (atom,key,value) triple +PRODUCER_CONSUMER_VALUE <- VALUE // control value for producer/consumer task // =========================================================== // TruthValues are the subobject classifiers for Atomese; they are used diff --git a/opencog/atoms/pattern/ParallelGetLink.cc b/opencog/atoms/pattern/ParallelGetLink.cc index 0344272d55..ac524957a3 100644 --- a/opencog/atoms/pattern/ParallelGetLink.cc +++ b/opencog/atoms/pattern/ParallelGetLink.cc @@ -58,7 +58,7 @@ HandleSet ParallelGetLink::do_execute(AtomSpace* as, bool silent) { if (nullptr == as) as = _atom_space; - ParallelSatisfier sater(as); + ParallelSatisfier sater(as, _target); this->satisfy(sater); return sater._satisfying_set; diff --git a/opencog/atoms/value/CMakeLists.txt b/opencog/atoms/value/CMakeLists.txt index de65568d59..f0e9403bfe 100644 --- a/opencog/atoms/value/CMakeLists.txt +++ b/opencog/atoms/value/CMakeLists.txt @@ -7,6 +7,7 @@ ADD_LIBRARY (value RandomStream.cc StreamValue.cc StringValue.cc + ProducerConsumerValue.cc ValueFactory.cc ) @@ -29,6 +30,7 @@ INSTALL (FILES RandomStream.h StreamValue.h StringValue.h + ProducerConsumerValue.h ValueFactory.h DESTINATION "include/opencog/atoms/value" ) diff --git a/opencog/atoms/value/ProducerConsumerValue.cc b/opencog/atoms/value/ProducerConsumerValue.cc new file mode 100644 index 0000000000..46b78e8712 --- /dev/null +++ b/opencog/atoms/value/ProducerConsumerValue.cc @@ -0,0 +1,71 @@ +/* + * opencog/atoms/value/ProducerConsumerValue.cc + * + * Copyright (C) 2015, 2016 Linas Vepstas + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License v3 as + * published by the Free Software Foundation and including the exceptions + * at http://opencog.org/wiki/Licenses + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program; if not, write to: + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include + +using namespace opencog; + +const Handle ProducerConsumerValue::CONTROL_KEY = createNode(CONCEPT_NODE, "PRODUCER_CONSUMER_KEY"); + +bool ProducerConsumerValue::operator==(const Value& other) const +{ + if (PRODUCER_CONSUMER_VALUE != other.get_type()) return false; + + const ProducerConsumerValue* cv = (const ProducerConsumerValue*) &other; + return _name == cv->_name; +} + +std::string ProducerConsumerValue::to_string(const std::string& indent) const +{ + std::string rv = indent + "(" + nameserver().getTypeName(_type); + rv += std::string(" \"") + _name + "\")"; + return rv; +} + +// Adds factory when library is loaded. +DEFINE_VALUE_FACTORY(PRODUCER_CONSUMER_VALUE, + createProducerConsumerValue, std::string) + + +// ============================================================== + +void ProducerConsumerControl::produce(const Handle& h) +{ + _queue.push(h); +} + +void ProducerConsumerControl::subscribe(Consumer consume) +{ + _consume = consume; +} + +void ProducerConsumerControl::finished() +{ + if (!_consume) return; + + while (!_queue.empty()) + { + _consume(_queue.front()); + _queue.pop(); + } +} diff --git a/opencog/atoms/value/ProducerConsumerValue.h b/opencog/atoms/value/ProducerConsumerValue.h new file mode 100644 index 0000000000..079d87b1ef --- /dev/null +++ b/opencog/atoms/value/ProducerConsumerValue.h @@ -0,0 +1,109 @@ +/* + * opencog/atoms/value/ProducerConsumerValue.h + * + * Copyright (C) 2015, 2016 Linas Vepstas + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License v3 as + * published by the Free Software Foundation and including the exceptions + * at http://opencog.org/wiki/Licenses + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program; if not, write to: + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _OPENCOG_PRODUCER_CONSUMER_CONTROL_VALUE_H +#define _OPENCOG_PRODUCER_CONSUMER_CONTROL_VALUE_H + +#include +#include +#include +#include +#include +#include +#include + +namespace opencog +{ + +/** \addtogroup grp_atomspace + * @{ + */ + +class ProducerConsumerControl; + +typedef void (Consumer)(Handle); +typedef std::queue HandleQueue; +typedef std::shared_ptr ProducerConsumerControlPtr; + +/** + * ProducerConsumerControlValues is a structure pass atoms from producer to consumer. + */ +class ProducerConsumerControl +{ +protected: + HandleQueue _queue; + void (*_consume)(Handle); + +public: + + virtual ~ProducerConsumerControl() {} + + virtual void produce(const Handle& h); + + virtual void subscribe(Consumer); + + virtual void finished(); +}; + +class ProducerConsumerValue + : public Value +{ +protected: + std::string _name; + ProducerConsumerControlPtr _control; + +public: + + static const Handle CONTROL_KEY; + + ProducerConsumerValue(const std::string& name) + : Value(PRODUCER_CONSUMER_VALUE), + _name(name), + _control(new ProducerConsumerControl()) { } + + virtual ~ProducerConsumerValue() {} + + ProducerConsumerControlPtr get_control() const { return _control; } + + /** Returns a string representation of the value. */ + virtual std::string to_string(const std::string& indent = "") const; + + /** Returns true if the two atoms are equal. */ + virtual bool operator==(const Value&) const; + +}; + + +typedef std::shared_ptr ProducerConsumerValuePtr; +static inline ProducerConsumerValuePtr ProducerConsumerValueCast(const ValuePtr& a) + { return std::dynamic_pointer_cast(a); } + +template +static inline std::shared_ptr createProducerConsumerValue(Type&&... args) { + return std::make_shared(std::forward(args)...); +} + + +/** @}*/ +} // namespace opencog + +#endif // _OPENCOG_PRODUCER_CONSUMER_CONTROL_VALUE_H diff --git a/opencog/query/Satisfier.cc b/opencog/query/Satisfier.cc index faac68ee8e..ed757ef126 100644 --- a/opencog/query/Satisfier.cc +++ b/opencog/query/Satisfier.cc @@ -164,7 +164,9 @@ bool ParallelSatisfier::grounding(const HandleMap &var_soln, // std::map::at() can throw. Rethrow for easier deubugging. try { - _satisfying_set.emplace(var_soln.at(_varseq[0])); + Handle h = var_soln.at(_varseq[0]); + _control_value-> get_control()-> produce(h); + _satisfying_set.emplace(h); } catch (...) { @@ -193,6 +195,7 @@ bool ParallelSatisfier::grounding(const HandleMap &var_soln, bool ParallelSatisfier::search_finished(bool done) { + _control_value-> get_control()-> finished(); return done; } diff --git a/opencog/query/Satisfier.h b/opencog/query/Satisfier.h index e7c15a5907..41eb4c3131 100644 --- a/opencog/query/Satisfier.h +++ b/opencog/query/Satisfier.h @@ -32,6 +32,8 @@ #include #include +#include + namespace opencog { /** @@ -125,10 +127,22 @@ class ParallelSatisfier : public virtual InitiateSearchCB, public virtual DefaultPatternMatchCB { + + protected: + Handle _set_node; + ProducerConsumerValuePtr _control_value; + + public: - ParallelSatisfier(AtomSpace* as) : + ParallelSatisfier(AtomSpace* as, Handle set_node) : InitiateSearchCB(as), DefaultPatternMatchCB(as), - max_results(SIZE_MAX) {} + _set_node(set_node), + _control_value(createProducerConsumerValue(set_node->get_name())), + max_results(SIZE_MAX) + { + _set_node->setValue(ProducerConsumerValue::CONTROL_KEY, + CastToValue(_control_value)); + } HandleSeq _varseq; HandleSet _satisfying_set; From 2daac23e7599294d998657c63bf1b5cd606ebc27 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 14 Aug 2019 16:02:59 +0300 Subject: [PATCH 06/11] Use Closable Queue in QueueValue --- opencog/atoms/atom_types/atom_types.script | 2 +- opencog/atoms/value/CMakeLists.txt | 4 +- opencog/atoms/value/ProducerConsumerValue.h | 109 ------------- ...ProducerConsumerValue.cc => QueueValue.cc} | 42 ++--- opencog/atoms/value/QueueValue.h | 144 ++++++++++++++++++ opencog/query/Satisfier.cc | 4 +- opencog/query/Satisfier.h | 10 +- 7 files changed, 163 insertions(+), 152 deletions(-) delete mode 100644 opencog/atoms/value/ProducerConsumerValue.h rename opencog/atoms/value/{ProducerConsumerValue.cc => QueueValue.cc} (52%) create mode 100644 opencog/atoms/value/QueueValue.h diff --git a/opencog/atoms/atom_types/atom_types.script b/opencog/atoms/atom_types/atom_types.script index 49febaf49f..5fac76272e 100644 --- a/opencog/atoms/atom_types/atom_types.script +++ b/opencog/atoms/atom_types/atom_types.script @@ -23,7 +23,7 @@ FLOAT_VALUE <- VALUE // vector of floats, actually. STRING_VALUE <- VALUE // vector of strings LINK_VALUE <- VALUE // vector of values ("link" holding values) VALUATION <- VALUE // (atom,key,value) triple -PRODUCER_CONSUMER_VALUE <- VALUE // control value for producer/consumer task +QUEUE_VALUE <- VALUE // control value for producer/consumer task // =========================================================== // TruthValues are the subobject classifiers for Atomese; they are used diff --git a/opencog/atoms/value/CMakeLists.txt b/opencog/atoms/value/CMakeLists.txt index f0e9403bfe..9684168dca 100644 --- a/opencog/atoms/value/CMakeLists.txt +++ b/opencog/atoms/value/CMakeLists.txt @@ -7,7 +7,7 @@ ADD_LIBRARY (value RandomStream.cc StreamValue.cc StringValue.cc - ProducerConsumerValue.cc + QueueValue.cc ValueFactory.cc ) @@ -30,7 +30,7 @@ INSTALL (FILES RandomStream.h StreamValue.h StringValue.h - ProducerConsumerValue.h + QueueValue.h ValueFactory.h DESTINATION "include/opencog/atoms/value" ) diff --git a/opencog/atoms/value/ProducerConsumerValue.h b/opencog/atoms/value/ProducerConsumerValue.h deleted file mode 100644 index 079d87b1ef..0000000000 --- a/opencog/atoms/value/ProducerConsumerValue.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * opencog/atoms/value/ProducerConsumerValue.h - * - * Copyright (C) 2015, 2016 Linas Vepstas - * All Rights Reserved - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License v3 as - * published by the Free Software Foundation and including the exceptions - * at http://opencog.org/wiki/Licenses - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program; if not, write to: - * Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifndef _OPENCOG_PRODUCER_CONSUMER_CONTROL_VALUE_H -#define _OPENCOG_PRODUCER_CONSUMER_CONTROL_VALUE_H - -#include -#include -#include -#include -#include -#include -#include - -namespace opencog -{ - -/** \addtogroup grp_atomspace - * @{ - */ - -class ProducerConsumerControl; - -typedef void (Consumer)(Handle); -typedef std::queue HandleQueue; -typedef std::shared_ptr ProducerConsumerControlPtr; - -/** - * ProducerConsumerControlValues is a structure pass atoms from producer to consumer. - */ -class ProducerConsumerControl -{ -protected: - HandleQueue _queue; - void (*_consume)(Handle); - -public: - - virtual ~ProducerConsumerControl() {} - - virtual void produce(const Handle& h); - - virtual void subscribe(Consumer); - - virtual void finished(); -}; - -class ProducerConsumerValue - : public Value -{ -protected: - std::string _name; - ProducerConsumerControlPtr _control; - -public: - - static const Handle CONTROL_KEY; - - ProducerConsumerValue(const std::string& name) - : Value(PRODUCER_CONSUMER_VALUE), - _name(name), - _control(new ProducerConsumerControl()) { } - - virtual ~ProducerConsumerValue() {} - - ProducerConsumerControlPtr get_control() const { return _control; } - - /** Returns a string representation of the value. */ - virtual std::string to_string(const std::string& indent = "") const; - - /** Returns true if the two atoms are equal. */ - virtual bool operator==(const Value&) const; - -}; - - -typedef std::shared_ptr ProducerConsumerValuePtr; -static inline ProducerConsumerValuePtr ProducerConsumerValueCast(const ValuePtr& a) - { return std::dynamic_pointer_cast(a); } - -template -static inline std::shared_ptr createProducerConsumerValue(Type&&... args) { - return std::make_shared(std::forward(args)...); -} - - -/** @}*/ -} // namespace opencog - -#endif // _OPENCOG_PRODUCER_CONSUMER_CONTROL_VALUE_H diff --git a/opencog/atoms/value/ProducerConsumerValue.cc b/opencog/atoms/value/QueueValue.cc similarity index 52% rename from opencog/atoms/value/ProducerConsumerValue.cc rename to opencog/atoms/value/QueueValue.cc index 46b78e8712..38e4ccaf1e 100644 --- a/opencog/atoms/value/ProducerConsumerValue.cc +++ b/opencog/atoms/value/QueueValue.cc @@ -1,5 +1,5 @@ /* - * opencog/atoms/value/ProducerConsumerValue.cc + * opencog/atoms/value/QueueValue.cc * * Copyright (C) 2015, 2016 Linas Vepstas * All Rights Reserved @@ -20,22 +20,22 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#include +#include #include using namespace opencog; -const Handle ProducerConsumerValue::CONTROL_KEY = createNode(CONCEPT_NODE, "PRODUCER_CONSUMER_KEY"); +const Handle QueueValue::CONTROL_KEY = createNode(CONCEPT_NODE, "PRODUCER_CONSUMER_KEY"); -bool ProducerConsumerValue::operator==(const Value& other) const +bool QueueValue::operator==(const Value& other) const { - if (PRODUCER_CONSUMER_VALUE != other.get_type()) return false; + if (QUEUE_VALUE != other.get_type()) return false; - const ProducerConsumerValue* cv = (const ProducerConsumerValue*) &other; + const QueueValue* cv = (const QueueValue*) &other; return _name == cv->_name; } -std::string ProducerConsumerValue::to_string(const std::string& indent) const +std::string QueueValue::to_string(const std::string& indent) const { std::string rv = indent + "(" + nameserver().getTypeName(_type); rv += std::string(" \"") + _name + "\")"; @@ -43,29 +43,5 @@ std::string ProducerConsumerValue::to_string(const std::string& indent) const } // Adds factory when library is loaded. -DEFINE_VALUE_FACTORY(PRODUCER_CONSUMER_VALUE, - createProducerConsumerValue, std::string) - - -// ============================================================== - -void ProducerConsumerControl::produce(const Handle& h) -{ - _queue.push(h); -} - -void ProducerConsumerControl::subscribe(Consumer consume) -{ - _consume = consume; -} - -void ProducerConsumerControl::finished() -{ - if (!_consume) return; - - while (!_queue.empty()) - { - _consume(_queue.front()); - _queue.pop(); - } -} +DEFINE_VALUE_FACTORY(QUEUE_VALUE, + createQueueValue, std::string) diff --git a/opencog/atoms/value/QueueValue.h b/opencog/atoms/value/QueueValue.h new file mode 100644 index 0000000000..c4214b338d --- /dev/null +++ b/opencog/atoms/value/QueueValue.h @@ -0,0 +1,144 @@ +/* + * opencog/atoms/value/QueueValue.h + * + * Copyright (C) 2015, 2016 Linas Vepstas + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License v3 as + * published by the Free Software Foundation and including the exceptions + * at http://opencog.org/wiki/Licenses + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program; if not, write to: + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _OPENCOG_QUEUE_VALUE_H +#define _OPENCOG_QUEUE_VALUE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace opencog +{ + +/** \addtogroup grp_atomspace + * @{ + */ + +template +class ClosableQueue +{ +private: + std::queue _queue; + std::mutex _mutex; + std::condition_variable _cond; + bool _closed = false; + +public: + + void push(const T& item) + { + std::unique_lock mlock(_mutex); + _queue.push(item); + mlock.unlock(); + _cond.notify_one(); + } + + void push(T&& item) + { + std::unique_lock mlock(_mutex); + _queue.push(std::move(item)); + mlock.unlock(); + _cond.notify_one(); + } + + bool pop(T& item) + { + std::unique_lock mlock(_mutex); + + while (_queue.empty()) + { + if(_closed) return false; + _cond.wait(mlock); + } + + item = _queue.front(); + _queue.pop(); + + return true; + } + + void close() + { + std::unique_lock mlock(_mutex); + _closed = true; + mlock.unlock(); + _cond.notify_all(); + } +}; + +typedef ClosableQueue HandleClosableQueue; +typedef std::shared_ptr HandleClosableQueuePtr; + +/** + * ClosableQueueValue contains a queue that allows to pass values from + * a producer to consumer. + */ + +class QueueValue + : public Value +{ +protected: + std::string _name; + HandleClosableQueuePtr _handle_queue; + +public: + + static const Handle CONTROL_KEY; + + QueueValue(const std::string& name) + : Value(QUEUE_VALUE), + _name(name), + _handle_queue(new HandleClosableQueue()) { } + + virtual ~QueueValue() {} + + HandleClosableQueuePtr get_queue() const { return _handle_queue; } + + /** Returns a string representation of the value. */ + virtual std::string to_string(const std::string& indent = "") const; + + /** Returns true if the two atoms are equal. */ + virtual bool operator==(const Value&) const; +}; + + +typedef std::shared_ptr QueueValuePtr; +static inline QueueValuePtr QueueValueCast(const ValuePtr& a) + { return std::dynamic_pointer_cast(a); } + +template +static inline std::shared_ptr createQueueValue(Type&&... args) { + return std::make_shared(std::forward(args)...); +} + + +/** @}*/ +} // namespace opencog + +#endif // _OPENCOG_QUEUE_VALUE_H diff --git a/opencog/query/Satisfier.cc b/opencog/query/Satisfier.cc index ed757ef126..5bc4de476f 100644 --- a/opencog/query/Satisfier.cc +++ b/opencog/query/Satisfier.cc @@ -165,7 +165,7 @@ bool ParallelSatisfier::grounding(const HandleMap &var_soln, try { Handle h = var_soln.at(_varseq[0]); - _control_value-> get_control()-> produce(h); + _queue_value-> get_queue() -> push(h); _satisfying_set.emplace(h); } catch (...) @@ -195,7 +195,7 @@ bool ParallelSatisfier::grounding(const HandleMap &var_soln, bool ParallelSatisfier::search_finished(bool done) { - _control_value-> get_control()-> finished(); + _queue_value -> get_queue()-> close(); return done; } diff --git a/opencog/query/Satisfier.h b/opencog/query/Satisfier.h index 41eb4c3131..2ce95a2ae8 100644 --- a/opencog/query/Satisfier.h +++ b/opencog/query/Satisfier.h @@ -32,7 +32,7 @@ #include #include -#include +#include namespace opencog { @@ -130,18 +130,18 @@ class ParallelSatisfier : protected: Handle _set_node; - ProducerConsumerValuePtr _control_value; + QueueValuePtr _queue_value; public: ParallelSatisfier(AtomSpace* as, Handle set_node) : InitiateSearchCB(as), DefaultPatternMatchCB(as), _set_node(set_node), - _control_value(createProducerConsumerValue(set_node->get_name())), + _queue_value(createQueueValue(set_node->get_name())), max_results(SIZE_MAX) { - _set_node->setValue(ProducerConsumerValue::CONTROL_KEY, - CastToValue(_control_value)); + _set_node->setValue(QueueValue::CONTROL_KEY, + CastToValue(_queue_value)); } HandleSeq _varseq; From c8fafe1dee96db98f3859c500ea26d1488b87db6 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 14 Aug 2019 16:21:58 +0300 Subject: [PATCH 07/11] Rename CONTROL_KEY to QUEUE_VALUE_KEY --- opencog/atoms/value/QueueValue.cc | 2 +- opencog/atoms/value/QueueValue.h | 2 +- opencog/query/Satisfier.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opencog/atoms/value/QueueValue.cc b/opencog/atoms/value/QueueValue.cc index 38e4ccaf1e..fc5f9c5b9c 100644 --- a/opencog/atoms/value/QueueValue.cc +++ b/opencog/atoms/value/QueueValue.cc @@ -25,7 +25,7 @@ using namespace opencog; -const Handle QueueValue::CONTROL_KEY = createNode(CONCEPT_NODE, "PRODUCER_CONSUMER_KEY"); +const Handle QueueValue::QUEUE_VALUE_KEY = createNode(CONCEPT_NODE, "QUEUE_VALUE_KEY"); bool QueueValue::operator==(const Value& other) const { diff --git a/opencog/atoms/value/QueueValue.h b/opencog/atoms/value/QueueValue.h index c4214b338d..9b69cbcc18 100644 --- a/opencog/atoms/value/QueueValue.h +++ b/opencog/atoms/value/QueueValue.h @@ -109,7 +109,7 @@ class QueueValue public: - static const Handle CONTROL_KEY; + static const Handle QUEUE_VALUE_KEY; QueueValue(const std::string& name) : Value(QUEUE_VALUE), diff --git a/opencog/query/Satisfier.h b/opencog/query/Satisfier.h index 2ce95a2ae8..c9937b1c75 100644 --- a/opencog/query/Satisfier.h +++ b/opencog/query/Satisfier.h @@ -140,7 +140,7 @@ class ParallelSatisfier : _queue_value(createQueueValue(set_node->get_name())), max_results(SIZE_MAX) { - _set_node->setValue(QueueValue::CONTROL_KEY, + _set_node->setValue(QueueValue::QUEUE_VALUE_KEY, CastToValue(_queue_value)); } From 8b086fc07228451f771850c3d5d01785ad848b35 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 14 Aug 2019 16:48:26 +0300 Subject: [PATCH 08/11] Handle SetNode with QueueValue in PutLink --- opencog/atoms/core/PutLink.cc | 15 +++++++++++---- opencog/atoms/pattern/ParallelGetLink.cc | 17 +++++++++-------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/opencog/atoms/core/PutLink.cc b/opencog/atoms/core/PutLink.cc index 0df724a032..d29cabcebb 100644 --- a/opencog/atoms/core/PutLink.cc +++ b/opencog/atoms/core/PutLink.cc @@ -26,6 +26,7 @@ #include "LambdaLink.h" #include "PutLink.h" #include +#include using namespace opencog; @@ -452,13 +453,19 @@ Handle PutLink::do_reduce(void) const if (SET_NODE == vtype) { + // If the argument is SetNode, then process atoms from queue + // stored in SetNode value AtomSpace* as = getAtomSpace(); - for (const LinkPtr& lp : args->getIncomingSetByType(MEMBER_LINK)) + ValuePtr value = args->getValue(QueueValue::QUEUE_VALUE_KEY); + if (as && value) { - Handle h = lp->getOutgoingAtom(0); - if (as) as->add_atom(reddy(subs, {h})); + HandleClosableQueuePtr queue = QueueValueCast(value)->get_queue(); + Handle h; + while (queue->pop(h)) + { + as->add_atom(reddy(subs, {h})); + } } - return args; } return reddy(subs, {args}); diff --git a/opencog/atoms/pattern/ParallelGetLink.cc b/opencog/atoms/pattern/ParallelGetLink.cc index ac524957a3..9ef7bd2537 100644 --- a/opencog/atoms/pattern/ParallelGetLink.cc +++ b/opencog/atoms/pattern/ParallelGetLink.cc @@ -67,23 +67,24 @@ HandleSet ParallelGetLink::do_execute(AtomSpace* as, bool silent) ValuePtr ParallelGetLink::execute(AtomSpace* as, bool silent) { - HandleSet handle_set = do_execute(as, silent); +#define PLACE_RESULTS_IN_ATOMSPACE +#ifdef PLACE_RESULTS_IN_ATOMSPACE + // Shoot. XXX FIXME. Most of the unit tests require that the atom + // that we return is in the atomspace. But it would be nice if we + // could defer this indefinitely, until its really needed. + + HandleSet handle_set = do_execute(as, silent); for (auto h: handle_set) { + HandleSeq handle_seq; handle_seq.push_back(h); handle_seq.push_back(_target); Handle member_link = createLink(handle_seq, MEMBER_LINK); - -#define PLACE_RESULTS_IN_ATOMSPACE -#ifdef PLACE_RESULTS_IN_ATOMSPACE - // Shoot. XXX FIXME. Most of the unit tests require that the atom - // that we return is in the atomspace. But it would be nice if we - // could defer this indefinitely, until its really needed. if (as) member_link = as->add_atom(member_link); -#endif /* PLACE_RESULTS_IN_ATOMSPACE */ } +#endif /* PLACE_RESULTS_IN_ATOMSPACE */ return _target; } From 50fbc7b08477e88957577f21f29dca1f2d798964 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 14 Aug 2019 18:33:42 +0300 Subject: [PATCH 09/11] Add ParallelGetLinkUTest --- opencog/atoms/atom_types/atom_types.script | 2 +- tests/atoms/CMakeLists.txt | 2 + tests/atoms/ParallelGetLinkUTest.cxxtest | 125 +++++++++++++++++++++ tests/atoms/parallel-get.scm | 22 ++++ 4 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 tests/atoms/ParallelGetLinkUTest.cxxtest create mode 100644 tests/atoms/parallel-get.scm diff --git a/opencog/atoms/atom_types/atom_types.script b/opencog/atoms/atom_types/atom_types.script index 5fac76272e..7eb46e0d7b 100644 --- a/opencog/atoms/atom_types/atom_types.script +++ b/opencog/atoms/atom_types/atom_types.script @@ -23,7 +23,7 @@ FLOAT_VALUE <- VALUE // vector of floats, actually. STRING_VALUE <- VALUE // vector of strings LINK_VALUE <- VALUE // vector of values ("link" holding values) VALUATION <- VALUE // (atom,key,value) triple -QUEUE_VALUE <- VALUE // control value for producer/consumer task +QUEUE_VALUE <- VALUE // queue value to pass results from one link to another // =========================================================== // TruthValues are the subobject classifiers for Atomese; they are used diff --git a/tests/atoms/CMakeLists.txt b/tests/atoms/CMakeLists.txt index b255ff8886..cce36ce3a4 100644 --- a/tests/atoms/CMakeLists.txt +++ b/tests/atoms/CMakeLists.txt @@ -28,6 +28,8 @@ IF(HAVE_GUILE) TARGET_LINK_LIBRARIES(ScopeLinkUTest smob atomspace) ADD_CXXTEST(PutLinkUTest) TARGET_LINK_LIBRARIES(PutLinkUTest execution smob atomspace) + ADD_CXXTEST(ParallelGetLinkUTest) + TARGET_LINK_LIBRARIES(ParallelGetLinkUTest execution smob atomspace) ADD_CXXTEST(QuotationUTest) TARGET_LINK_LIBRARIES(QuotationUTest execution smob atomspace) ADD_CXXTEST(FormulaUTest) diff --git a/tests/atoms/ParallelGetLinkUTest.cxxtest b/tests/atoms/ParallelGetLinkUTest.cxxtest new file mode 100644 index 0000000000..0ff76b550c --- /dev/null +++ b/tests/atoms/ParallelGetLinkUTest.cxxtest @@ -0,0 +1,125 @@ +/* + * tests/atoms/ParallelGetLinkUTest.cxxtest + * + * Copyright (C) 2015,2017 Linas Vepstas + * Copyright (C) 2016 Nil Geiswieller + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License v3 as + * published by the Free Software Foundation and including the exceptions + * at http://opencog.org/wiki/Licenses + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program; if not, write to: + * Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include +#include +#include +#include +#include + +#include + +using namespace opencog; + +// Test the ParallelGetLink. +// +class ParallelGetLinkUTest: public CxxTest::TestSuite +{ +private: + AtomSpace _as; + SchemeEval _eval; + +public: + ParallelGetLinkUTest() : _eval(&_as) + { + logger().set_print_to_stdout_flag(true); + logger().set_level(Logger::INFO); + logger().set_timestamp_flag(false); + logger().set_sync_flag(true); // interleave with printf correctly + + _eval.eval("(add-to-load-path \"" PROJECT_SOURCE_DIR "\")"); + + } + + void setUp() {} + + void tearDown() {} + + void test_get(); +}; + +#define N _as.add_node +#define L _as.add_link + +/** + * + * Test + * + * (Inheritance (Concept "ball-1") (Concept "green")) + * (Inheritance (Concept "ball-2") (Concept "red")) + * (Inheritance (Concept "ball-3") (Concept "green")) + * (Inheritance (Concept "ball-4") (Concept "red")) + * + * (ParallelGet + * (Inheritance + * (Variable "$BALL") + * (Concept "green"))) + * + * which should return (SetNode "S") + * and produce MemberLinks in atomspace + * + * (Member + * (Concept "ball-1") + * (SetNode "S")) + * (Member + * (Concept "ball-3") + * (SetNode "S")) +*/ +void ParallelGetLinkUTest::test_get() +{ + logger().info("BEGIN TEST: %s", __FUNCTION__); + + std::string rs = _eval.eval("(load-from-path \"tests/atoms/parallel-get.scm\")"); + logger().debug() << "rs = " << rs; + + Instantiator inst(&_as); + Handle put = _eval.eval_h("parallel-get"); + Handle result = HandleCast(inst.execute(put)); + Handle expected = _eval.eval_h("expected-parallel-get"); + + logger().debug() << "result = " << oc_to_string(result); + logger().debug() << "expected = " << oc_to_string(expected); + + TS_ASSERT_EQUALS(result, expected); + + HandleSeq hs; + _as.get_handles_by_type(hs, MEMBER_LINK, false); + + Handle set_node = N(SET_NODE, "S"); + Handle link_1 = L(MEMBER_LINK, N(CONCEPT_NODE, "ball-1"), set_node); + Handle link_2 = L(MEMBER_LINK, N(CONCEPT_NODE, "ball-3"), set_node); + + int found_count = 0; + for (auto& h : hs) + { + if (link_1 == h || link_2 == h) + { + found_count++; + } + } + + TS_ASSERT_EQUALS(found_count, 2); + + logger().info("END TEST: %s", __FUNCTION__); +} \ No newline at end of file diff --git a/tests/atoms/parallel-get.scm b/tests/atoms/parallel-get.scm new file mode 100644 index 0000000000..354a956a8c --- /dev/null +++ b/tests/atoms/parallel-get.scm @@ -0,0 +1,22 @@ +(Inheritance (Concept "ball-1") (Concept "green")) +(Inheritance (Concept "ball-2") (Concept "red")) +(Inheritance (Concept "ball-3") (Concept "green")) +(Inheritance (Concept "ball-4") (Concept "red")) + +(define parallel-get + (ParallelGet + (Variable "$BALL") + (Inheritance + (Variable "$BALL") + (Concept "green")) + (SetNode "S"))) + +(define expected-parallel-get + (SetNode "S")) + +(define expected-members + (Get + (Member + (Variable "$BALL") + (Variable "$SET")))) + From 325c67f6381dcd450d448a54003d571a9bc616b3 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 14 Aug 2019 19:17:40 +0300 Subject: [PATCH 10/11] Add ParallelGet get/put test --- opencog/atoms/core/PutLink.cc | 24 +++++---- tests/atoms/ParallelGetLinkUTest.cxxtest | 63 ++++++++++++++++++++++-- tests/atoms/parallel-get.scm | 18 +++++-- 3 files changed, 89 insertions(+), 16 deletions(-) diff --git a/opencog/atoms/core/PutLink.cc b/opencog/atoms/core/PutLink.cc index d29cabcebb..6b1cf1e2a2 100644 --- a/opencog/atoms/core/PutLink.cc +++ b/opencog/atoms/core/PutLink.cc @@ -257,6 +257,16 @@ static inline Handle reddy(PrenexLinkPtr& subs, const HandleSeq& oset) return subs->beta_reduce(oset); } +static inline void reddy(HandleSeq& bset, PrenexLinkPtr& subs, const HandleSeq& oset) +{ + try + { + bset.emplace_back(reddy(subs, oset)); + } + catch (const TypeCheckException& ex) {} + +} + // If arg is executable, then run it, and unwrap the set link, too. // We unwrap the SetLinks cause that is what GetLinks return. static inline Handle expand(const Handle& arg, bool silent) @@ -440,13 +450,7 @@ Handle PutLink::do_reduce(void) const HandleSeq bset; for (const Handle& h : args->getOutgoingSet()) { - HandleSeq oset; - oset.emplace_back(h); - try - { - bset.emplace_back(reddy(subs, oset)); - } - catch (const TypeCheckException& ex) {} + reddy(bset, subs, {h}); } return createLink(bset, SET_LINK); } @@ -459,12 +463,14 @@ Handle PutLink::do_reduce(void) const ValuePtr value = args->getValue(QueueValue::QUEUE_VALUE_KEY); if (as && value) { - HandleClosableQueuePtr queue = QueueValueCast(value)->get_queue(); Handle h; + HandleSeq bset; + HandleClosableQueuePtr queue = QueueValueCast(value)->get_queue(); while (queue->pop(h)) { - as->add_atom(reddy(subs, {h})); + reddy(bset, subs, {h}); } + return createLink(bset, SET_LINK); } } diff --git a/tests/atoms/ParallelGetLinkUTest.cxxtest b/tests/atoms/ParallelGetLinkUTest.cxxtest index 0ff76b550c..57c7555d2d 100644 --- a/tests/atoms/ParallelGetLinkUTest.cxxtest +++ b/tests/atoms/ParallelGetLinkUTest.cxxtest @@ -57,6 +57,7 @@ public: void tearDown() {} void test_get(); + void test_get_put(); }; #define N _as.add_node @@ -94,8 +95,8 @@ void ParallelGetLinkUTest::test_get() logger().debug() << "rs = " << rs; Instantiator inst(&_as); - Handle put = _eval.eval_h("parallel-get"); - Handle result = HandleCast(inst.execute(put)); + Handle get = _eval.eval_h("parallel-get"); + Handle result = HandleCast(inst.execute(get)); Handle expected = _eval.eval_h("expected-parallel-get"); logger().debug() << "result = " << oc_to_string(result); @@ -122,4 +123,60 @@ void ParallelGetLinkUTest::test_get() TS_ASSERT_EQUALS(found_count, 2); logger().info("END TEST: %s", __FUNCTION__); -} \ No newline at end of file +} + +/** + * + * Test + * + * (Inheritance (Concept "ball-1") (Concept "green")) + * (Inheritance (Concept "ball-2") (Concept "red")) + * (Inheritance (Concept "ball-3") (Concept "green")) + * (Inheritance (Concept "ball-4") (Concept "red")) + * + * (ParallelGet + * (Inheritance + * (Variable "$BALL") + * (Concept "green"))) + * + * (Put + * (Variable "$BALL") + * (Inheritance + * (Variable "$BALL") + * (Concept "selected-balls")) + * (SetNode "S")) + * + * The putLink should return + * + * (SetLink + * (InheritanceLink + * (ConceptNode "ball-1") + * (ConceptNode "selected-balls")) + * (InheritanceLink + * (ConceptNode "ball-3") + * (ConceptNode "selected-balls"))) +*/ + void ParallelGetLinkUTest::test_get_put() + { + logger().info("BEGIN TEST: %s", __FUNCTION__); + + std::string rs = _eval.eval("(load-from-path \"tests/atoms/parallel-get.scm\")"); + logger().debug() << "rs = " << rs; + + Instantiator inst(&_as); + Handle get = _eval.eval_h("parallel-get"); + Handle put = _eval.eval_h("parallel-put"); + inst.execute(get); + Handle result = HandleCast(inst.execute(put)); + Handle expected = _eval.eval_h("expected-parallel-put"); + + logger().debug() << "result = " << oc_to_string(result); + logger().debug() << "expected = " << oc_to_string(expected); + + printf("result: %s\n", result->to_short_string().c_str()); + printf("expected: %s\n", expected->to_short_string().c_str()); + + TS_ASSERT_EQUALS(result, expected); + + logger().info("END TEST: %s", __FUNCTION__); + } diff --git a/tests/atoms/parallel-get.scm b/tests/atoms/parallel-get.scm index 354a956a8c..f4efa09685 100644 --- a/tests/atoms/parallel-get.scm +++ b/tests/atoms/parallel-get.scm @@ -14,9 +14,19 @@ (define expected-parallel-get (SetNode "S")) -(define expected-members - (Get - (Member +(define parallel-put + (Put + (Variable "$BALL") + (Inheritance (Variable "$BALL") - (Variable "$SET")))) + (Concept "selected-balls")) + (SetNode "S"))) +(define expected-parallel-put + (SetLink + (InheritanceLink + (ConceptNode "ball-1") + (ConceptNode "selected-balls")) + (InheritanceLink + (ConceptNode "ball-3") + (ConceptNode "selected-balls")))) From b89c25f5311023aa818ec6c7fc5e296d52766913 Mon Sep 17 00:00:00 2001 From: Alexander Scherbatiy Date: Wed, 14 Aug 2019 19:45:06 +0300 Subject: [PATCH 11/11] Update ParallelGetLink link comments --- opencog/atoms/atom_types/atom_types.script | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opencog/atoms/atom_types/atom_types.script b/opencog/atoms/atom_types/atom_types.script index 7eb46e0d7b..ffcd9eb997 100644 --- a/opencog/atoms/atom_types/atom_types.script +++ b/opencog/atoms/atom_types/atom_types.script @@ -23,7 +23,7 @@ FLOAT_VALUE <- VALUE // vector of floats, actually. STRING_VALUE <- VALUE // vector of strings LINK_VALUE <- VALUE // vector of values ("link" holding values) VALUATION <- VALUE // (atom,key,value) triple -QUEUE_VALUE <- VALUE // queue value to pass results from one link to another +QUEUE_VALUE <- VALUE // queue value to pass results from one atom to another // =========================================================== // TruthValues are the subobject classifiers for Atomese; they are used @@ -387,9 +387,9 @@ SATISFYING_LINK <- PATTERN_LINK // QueryLink is identical to BindLink, except it returns a LinkValue // holding the result, instead of a SetLink. (Less atomspace pollution). GET_LINK <- SATISFYING_LINK // Finds all groundings, returns them -PARALLEL_GET_LINK <- SATISFYING_LINK // Finds all groundings, returns them QUERY_LINK <- SATISFYING_LINK // Finds all groundings, substitutes. BIND_LINK <- QUERY_LINK // Finds all groundings, substitutes. +PARALLEL_GET_LINK <- SATISFYING_LINK // Finds all groundings, puts them to queue // Adjoint to the GetLink. This is "adjoint" in the sense that the roles // of the pattern and the grounding are reversed: given a grounding, the